xref: /freebsd/sys/rpc/svc.c (revision 3823d5e198425b4f5e5a80267d195769d1063773)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice,
12  *   this list of conditions and the following disclaimer in the documentation
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37 
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62 
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66 
67 #include <rpc/rpc_com.h>
68 
69 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73     char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, int delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 
79 /* ***************  SVCXPRT related stuff **************** */
80 
81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84 
85 SVCPOOL*
86 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
87 {
88 	SVCPOOL *pool;
89 	SVCGROUP *grp;
90 	int g;
91 
92 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
93 
94 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95 	pool->sp_name = name;
96 	pool->sp_state = SVCPOOL_INIT;
97 	pool->sp_proc = NULL;
98 	TAILQ_INIT(&pool->sp_callouts);
99 	TAILQ_INIT(&pool->sp_lcallouts);
100 	pool->sp_minthreads = 1;
101 	pool->sp_maxthreads = 1;
102 	pool->sp_groupcount = 1;
103 	for (g = 0; g < SVC_MAXGROUPS; g++) {
104 		grp = &pool->sp_groups[g];
105 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
106 		grp->sg_pool = pool;
107 		grp->sg_state = SVCPOOL_ACTIVE;
108 		TAILQ_INIT(&grp->sg_xlist);
109 		TAILQ_INIT(&grp->sg_active);
110 		LIST_INIT(&grp->sg_idlethreads);
111 		grp->sg_minthreads = 1;
112 		grp->sg_maxthreads = 1;
113 	}
114 
115 	/*
116 	 * Don't use more than a quarter of mbuf clusters or more than
117 	 * 45Mb buffering requests.
118 	 */
119 	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
120 	if (pool->sp_space_high > 45 << 20)
121 		pool->sp_space_high = 45 << 20;
122 	pool->sp_space_low = 2 * pool->sp_space_high / 3;
123 
124 	sysctl_ctx_init(&pool->sp_sysctl);
125 	if (sysctl_base) {
126 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
128 		    pool, 0, svcpool_minthread_sysctl, "I",
129 		    "Minimal number of threads");
130 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
132 		    pool, 0, svcpool_maxthread_sysctl, "I",
133 		    "Maximal number of threads");
134 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135 		    "threads", CTLTYPE_INT | CTLFLAG_RD,
136 		    pool, 0, svcpool_threads_sysctl, "I",
137 		    "Current number of threads");
138 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
140 		    "Number of thread groups");
141 
142 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143 		    "request_space_used", CTLFLAG_RD,
144 		    &pool->sp_space_used, 0,
145 		    "Space in parsed but not handled requests.");
146 
147 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
148 		    "request_space_used_highest", CTLFLAG_RD,
149 		    &pool->sp_space_used_highest, 0,
150 		    "Highest space used since reboot.");
151 
152 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
153 		    "request_space_high", CTLFLAG_RW,
154 		    &pool->sp_space_high, 0,
155 		    "Maximum space in parsed but not handled requests.");
156 
157 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
158 		    "request_space_low", CTLFLAG_RW,
159 		    &pool->sp_space_low, 0,
160 		    "Low water mark for request space.");
161 
162 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
163 		    "request_space_throttled", CTLFLAG_RD,
164 		    &pool->sp_space_throttled, 0,
165 		    "Whether nfs requests are currently throttled");
166 
167 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
168 		    "request_space_throttle_count", CTLFLAG_RD,
169 		    &pool->sp_space_throttle_count, 0,
170 		    "Count of times throttling based on request space has occurred");
171 	}
172 
173 	return pool;
174 }
175 
176 void
177 svcpool_destroy(SVCPOOL *pool)
178 {
179 	SVCGROUP *grp;
180 	SVCXPRT *xprt, *nxprt;
181 	struct svc_callout *s;
182 	struct svc_loss_callout *sl;
183 	struct svcxprt_list cleanup;
184 	int g;
185 
186 	TAILQ_INIT(&cleanup);
187 
188 	for (g = 0; g < SVC_MAXGROUPS; g++) {
189 		grp = &pool->sp_groups[g];
190 		mtx_lock(&grp->sg_lock);
191 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
192 			xprt_unregister_locked(xprt);
193 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
194 		}
195 		mtx_unlock(&grp->sg_lock);
196 	}
197 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
198 		SVC_RELEASE(xprt);
199 	}
200 
201 	mtx_lock(&pool->sp_lock);
202 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
203 		mtx_unlock(&pool->sp_lock);
204 		svc_unreg(pool, s->sc_prog, s->sc_vers);
205 		mtx_lock(&pool->sp_lock);
206 	}
207 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
208 		mtx_unlock(&pool->sp_lock);
209 		svc_loss_unreg(pool, sl->slc_dispatch);
210 		mtx_lock(&pool->sp_lock);
211 	}
212 	mtx_unlock(&pool->sp_lock);
213 
214 	for (g = 0; g < SVC_MAXGROUPS; g++) {
215 		grp = &pool->sp_groups[g];
216 		mtx_destroy(&grp->sg_lock);
217 	}
218 	mtx_destroy(&pool->sp_lock);
219 
220 	if (pool->sp_rcache)
221 		replay_freecache(pool->sp_rcache);
222 
223 	sysctl_ctx_free(&pool->sp_sysctl);
224 	free(pool, M_RPC);
225 }
226 
227 /*
228  * Sysctl handler to get the present thread count on a pool
229  */
230 static int
231 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
232 {
233 	SVCPOOL *pool;
234 	int threads, error, g;
235 
236 	pool = oidp->oid_arg1;
237 	threads = 0;
238 	mtx_lock(&pool->sp_lock);
239 	for (g = 0; g < pool->sp_groupcount; g++)
240 		threads += pool->sp_groups[g].sg_threadcount;
241 	mtx_unlock(&pool->sp_lock);
242 	error = sysctl_handle_int(oidp, &threads, 0, req);
243 	return (error);
244 }
245 
246 /*
247  * Sysctl handler to set the minimum thread count on a pool
248  */
249 static int
250 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
251 {
252 	SVCPOOL *pool;
253 	int newminthreads, error, g;
254 
255 	pool = oidp->oid_arg1;
256 	newminthreads = pool->sp_minthreads;
257 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
258 	if (error == 0 && newminthreads != pool->sp_minthreads) {
259 		if (newminthreads > pool->sp_maxthreads)
260 			return (EINVAL);
261 		mtx_lock(&pool->sp_lock);
262 		pool->sp_minthreads = newminthreads;
263 		for (g = 0; g < pool->sp_groupcount; g++) {
264 			pool->sp_groups[g].sg_minthreads = max(1,
265 			    pool->sp_minthreads / pool->sp_groupcount);
266 		}
267 		mtx_unlock(&pool->sp_lock);
268 	}
269 	return (error);
270 }
271 
272 /*
273  * Sysctl handler to set the maximum thread count on a pool
274  */
275 static int
276 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
277 {
278 	SVCPOOL *pool;
279 	int newmaxthreads, error, g;
280 
281 	pool = oidp->oid_arg1;
282 	newmaxthreads = pool->sp_maxthreads;
283 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
284 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
285 		if (newmaxthreads < pool->sp_minthreads)
286 			return (EINVAL);
287 		mtx_lock(&pool->sp_lock);
288 		pool->sp_maxthreads = newmaxthreads;
289 		for (g = 0; g < pool->sp_groupcount; g++) {
290 			pool->sp_groups[g].sg_maxthreads = max(1,
291 			    pool->sp_maxthreads / pool->sp_groupcount);
292 		}
293 		mtx_unlock(&pool->sp_lock);
294 	}
295 	return (error);
296 }
297 
298 /*
299  * Activate a transport handle.
300  */
301 void
302 xprt_register(SVCXPRT *xprt)
303 {
304 	SVCPOOL *pool = xprt->xp_pool;
305 	SVCGROUP *grp;
306 	int g;
307 
308 	SVC_ACQUIRE(xprt);
309 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
310 	xprt->xp_group = grp = &pool->sp_groups[g];
311 	mtx_lock(&grp->sg_lock);
312 	xprt->xp_registered = TRUE;
313 	xprt->xp_active = FALSE;
314 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
315 	mtx_unlock(&grp->sg_lock);
316 }
317 
318 /*
319  * De-activate a transport handle. Note: the locked version doesn't
320  * release the transport - caller must do that after dropping the pool
321  * lock.
322  */
323 static void
324 xprt_unregister_locked(SVCXPRT *xprt)
325 {
326 	SVCGROUP *grp = xprt->xp_group;
327 
328 	mtx_assert(&grp->sg_lock, MA_OWNED);
329 	KASSERT(xprt->xp_registered == TRUE,
330 	    ("xprt_unregister_locked: not registered"));
331 	xprt_inactive_locked(xprt);
332 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
333 	xprt->xp_registered = FALSE;
334 }
335 
336 void
337 xprt_unregister(SVCXPRT *xprt)
338 {
339 	SVCGROUP *grp = xprt->xp_group;
340 
341 	mtx_lock(&grp->sg_lock);
342 	if (xprt->xp_registered == FALSE) {
343 		/* Already unregistered by another thread */
344 		mtx_unlock(&grp->sg_lock);
345 		return;
346 	}
347 	xprt_unregister_locked(xprt);
348 	mtx_unlock(&grp->sg_lock);
349 
350 	SVC_RELEASE(xprt);
351 }
352 
353 /*
354  * Attempt to assign a service thread to this transport.
355  */
356 static int
357 xprt_assignthread(SVCXPRT *xprt)
358 {
359 	SVCGROUP *grp = xprt->xp_group;
360 	SVCTHREAD *st;
361 
362 	mtx_assert(&grp->sg_lock, MA_OWNED);
363 	st = LIST_FIRST(&grp->sg_idlethreads);
364 	if (st) {
365 		LIST_REMOVE(st, st_ilink);
366 		SVC_ACQUIRE(xprt);
367 		xprt->xp_thread = st;
368 		st->st_xprt = xprt;
369 		cv_signal(&st->st_cond);
370 		return (TRUE);
371 	} else {
372 		/*
373 		 * See if we can create a new thread. The
374 		 * actual thread creation happens in
375 		 * svc_run_internal because our locking state
376 		 * is poorly defined (we are typically called
377 		 * from a socket upcall). Don't create more
378 		 * than one thread per second.
379 		 */
380 		if (grp->sg_state == SVCPOOL_ACTIVE
381 		    && grp->sg_lastcreatetime < time_uptime
382 		    && grp->sg_threadcount < grp->sg_maxthreads) {
383 			grp->sg_state = SVCPOOL_THREADWANTED;
384 		}
385 	}
386 	return (FALSE);
387 }
388 
389 void
390 xprt_active(SVCXPRT *xprt)
391 {
392 	SVCGROUP *grp = xprt->xp_group;
393 
394 	mtx_lock(&grp->sg_lock);
395 
396 	if (!xprt->xp_registered) {
397 		/*
398 		 * Race with xprt_unregister - we lose.
399 		 */
400 		mtx_unlock(&grp->sg_lock);
401 		return;
402 	}
403 
404 	if (!xprt->xp_active) {
405 		xprt->xp_active = TRUE;
406 		if (xprt->xp_thread == NULL) {
407 			if (!svc_request_space_available(xprt->xp_pool) ||
408 			    !xprt_assignthread(xprt))
409 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
410 				    xp_alink);
411 		}
412 	}
413 
414 	mtx_unlock(&grp->sg_lock);
415 }
416 
417 void
418 xprt_inactive_locked(SVCXPRT *xprt)
419 {
420 	SVCGROUP *grp = xprt->xp_group;
421 
422 	mtx_assert(&grp->sg_lock, MA_OWNED);
423 	if (xprt->xp_active) {
424 		if (xprt->xp_thread == NULL)
425 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
426 		xprt->xp_active = FALSE;
427 	}
428 }
429 
430 void
431 xprt_inactive(SVCXPRT *xprt)
432 {
433 	SVCGROUP *grp = xprt->xp_group;
434 
435 	mtx_lock(&grp->sg_lock);
436 	xprt_inactive_locked(xprt);
437 	mtx_unlock(&grp->sg_lock);
438 }
439 
440 /*
441  * Variant of xprt_inactive() for use only when sure that port is
442  * assigned to thread. For example, withing receive handlers.
443  */
444 void
445 xprt_inactive_self(SVCXPRT *xprt)
446 {
447 
448 	KASSERT(xprt->xp_thread != NULL,
449 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
450 	xprt->xp_active = FALSE;
451 }
452 
453 /*
454  * Add a service program to the callout list.
455  * The dispatch routine will be called when a rpc request for this
456  * program number comes in.
457  */
458 bool_t
459 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
460     void (*dispatch)(struct svc_req *, SVCXPRT *),
461     const struct netconfig *nconf)
462 {
463 	SVCPOOL *pool = xprt->xp_pool;
464 	struct svc_callout *s;
465 	char *netid = NULL;
466 	int flag = 0;
467 
468 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
469 
470 	if (xprt->xp_netid) {
471 		netid = strdup(xprt->xp_netid, M_RPC);
472 		flag = 1;
473 	} else if (nconf && nconf->nc_netid) {
474 		netid = strdup(nconf->nc_netid, M_RPC);
475 		flag = 1;
476 	} /* must have been created with svc_raw_create */
477 	if ((netid == NULL) && (flag == 1)) {
478 		return (FALSE);
479 	}
480 
481 	mtx_lock(&pool->sp_lock);
482 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
483 		if (netid)
484 			free(netid, M_RPC);
485 		if (s->sc_dispatch == dispatch)
486 			goto rpcb_it; /* he is registering another xptr */
487 		mtx_unlock(&pool->sp_lock);
488 		return (FALSE);
489 	}
490 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
491 	if (s == NULL) {
492 		if (netid)
493 			free(netid, M_RPC);
494 		mtx_unlock(&pool->sp_lock);
495 		return (FALSE);
496 	}
497 
498 	s->sc_prog = prog;
499 	s->sc_vers = vers;
500 	s->sc_dispatch = dispatch;
501 	s->sc_netid = netid;
502 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
503 
504 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
505 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
506 
507 rpcb_it:
508 	mtx_unlock(&pool->sp_lock);
509 	/* now register the information with the local binder service */
510 	if (nconf) {
511 		bool_t dummy;
512 		struct netconfig tnc;
513 		struct netbuf nb;
514 		tnc = *nconf;
515 		nb.buf = &xprt->xp_ltaddr;
516 		nb.len = xprt->xp_ltaddr.ss_len;
517 		dummy = rpcb_set(prog, vers, &tnc, &nb);
518 		return (dummy);
519 	}
520 	return (TRUE);
521 }
522 
523 /*
524  * Remove a service program from the callout list.
525  */
526 void
527 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
528 {
529 	struct svc_callout *s;
530 
531 	/* unregister the information anyway */
532 	(void) rpcb_unset(prog, vers, NULL);
533 	mtx_lock(&pool->sp_lock);
534 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
535 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
536 		if (s->sc_netid)
537 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
538 		mem_free(s, sizeof (struct svc_callout));
539 	}
540 	mtx_unlock(&pool->sp_lock);
541 }
542 
543 /*
544  * Add a service connection loss program to the callout list.
545  * The dispatch routine will be called when some port in ths pool die.
546  */
547 bool_t
548 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
549 {
550 	SVCPOOL *pool = xprt->xp_pool;
551 	struct svc_loss_callout *s;
552 
553 	mtx_lock(&pool->sp_lock);
554 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
555 		if (s->slc_dispatch == dispatch)
556 			break;
557 	}
558 	if (s != NULL) {
559 		mtx_unlock(&pool->sp_lock);
560 		return (TRUE);
561 	}
562 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
563 	if (s == NULL) {
564 		mtx_unlock(&pool->sp_lock);
565 		return (FALSE);
566 	}
567 	s->slc_dispatch = dispatch;
568 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
569 	mtx_unlock(&pool->sp_lock);
570 	return (TRUE);
571 }
572 
573 /*
574  * Remove a service connection loss program from the callout list.
575  */
576 void
577 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
578 {
579 	struct svc_loss_callout *s;
580 
581 	mtx_lock(&pool->sp_lock);
582 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
583 		if (s->slc_dispatch == dispatch) {
584 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
585 			free(s, M_RPC);
586 			break;
587 		}
588 	}
589 	mtx_unlock(&pool->sp_lock);
590 }
591 
592 /* ********************** CALLOUT list related stuff ************* */
593 
594 /*
595  * Search the callout list for a program number, return the callout
596  * struct.
597  */
598 static struct svc_callout *
599 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
600 {
601 	struct svc_callout *s;
602 
603 	mtx_assert(&pool->sp_lock, MA_OWNED);
604 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
605 		if (s->sc_prog == prog && s->sc_vers == vers
606 		    && (netid == NULL || s->sc_netid == NULL ||
607 			strcmp(netid, s->sc_netid) == 0))
608 			break;
609 	}
610 
611 	return (s);
612 }
613 
614 /* ******************* REPLY GENERATION ROUTINES  ************ */
615 
616 static bool_t
617 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
618     struct mbuf *body)
619 {
620 	SVCXPRT *xprt = rqstp->rq_xprt;
621 	bool_t ok;
622 
623 	if (rqstp->rq_args) {
624 		m_freem(rqstp->rq_args);
625 		rqstp->rq_args = NULL;
626 	}
627 
628 	if (xprt->xp_pool->sp_rcache)
629 		replay_setreply(xprt->xp_pool->sp_rcache,
630 		    rply, svc_getrpccaller(rqstp), body);
631 
632 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
633 		return (FALSE);
634 
635 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
636 	if (rqstp->rq_addr) {
637 		free(rqstp->rq_addr, M_SONAME);
638 		rqstp->rq_addr = NULL;
639 	}
640 
641 	return (ok);
642 }
643 
644 /*
645  * Send a reply to an rpc request
646  */
647 bool_t
648 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
649 {
650 	struct rpc_msg rply;
651 	struct mbuf *m;
652 	XDR xdrs;
653 	bool_t ok;
654 
655 	rply.rm_xid = rqstp->rq_xid;
656 	rply.rm_direction = REPLY;
657 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
658 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
659 	rply.acpted_rply.ar_stat = SUCCESS;
660 	rply.acpted_rply.ar_results.where = NULL;
661 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
662 
663 	m = m_getcl(M_WAITOK, MT_DATA, 0);
664 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
665 	ok = xdr_results(&xdrs, xdr_location);
666 	XDR_DESTROY(&xdrs);
667 
668 	if (ok) {
669 		return (svc_sendreply_common(rqstp, &rply, m));
670 	} else {
671 		m_freem(m);
672 		return (FALSE);
673 	}
674 }
675 
676 bool_t
677 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
678 {
679 	struct rpc_msg rply;
680 
681 	rply.rm_xid = rqstp->rq_xid;
682 	rply.rm_direction = REPLY;
683 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
684 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
685 	rply.acpted_rply.ar_stat = SUCCESS;
686 	rply.acpted_rply.ar_results.where = NULL;
687 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
688 
689 	return (svc_sendreply_common(rqstp, &rply, m));
690 }
691 
692 /*
693  * No procedure error reply
694  */
695 void
696 svcerr_noproc(struct svc_req *rqstp)
697 {
698 	SVCXPRT *xprt = rqstp->rq_xprt;
699 	struct rpc_msg rply;
700 
701 	rply.rm_xid = rqstp->rq_xid;
702 	rply.rm_direction = REPLY;
703 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
704 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
705 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
706 
707 	if (xprt->xp_pool->sp_rcache)
708 		replay_setreply(xprt->xp_pool->sp_rcache,
709 		    &rply, svc_getrpccaller(rqstp), NULL);
710 
711 	svc_sendreply_common(rqstp, &rply, NULL);
712 }
713 
714 /*
715  * Can't decode args error reply
716  */
717 void
718 svcerr_decode(struct svc_req *rqstp)
719 {
720 	SVCXPRT *xprt = rqstp->rq_xprt;
721 	struct rpc_msg rply;
722 
723 	rply.rm_xid = rqstp->rq_xid;
724 	rply.rm_direction = REPLY;
725 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
726 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
727 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
728 
729 	if (xprt->xp_pool->sp_rcache)
730 		replay_setreply(xprt->xp_pool->sp_rcache,
731 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
732 
733 	svc_sendreply_common(rqstp, &rply, NULL);
734 }
735 
736 /*
737  * Some system error
738  */
739 void
740 svcerr_systemerr(struct svc_req *rqstp)
741 {
742 	SVCXPRT *xprt = rqstp->rq_xprt;
743 	struct rpc_msg rply;
744 
745 	rply.rm_xid = rqstp->rq_xid;
746 	rply.rm_direction = REPLY;
747 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
748 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
749 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
750 
751 	if (xprt->xp_pool->sp_rcache)
752 		replay_setreply(xprt->xp_pool->sp_rcache,
753 		    &rply, svc_getrpccaller(rqstp), NULL);
754 
755 	svc_sendreply_common(rqstp, &rply, NULL);
756 }
757 
758 /*
759  * Authentication error reply
760  */
761 void
762 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
763 {
764 	SVCXPRT *xprt = rqstp->rq_xprt;
765 	struct rpc_msg rply;
766 
767 	rply.rm_xid = rqstp->rq_xid;
768 	rply.rm_direction = REPLY;
769 	rply.rm_reply.rp_stat = MSG_DENIED;
770 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
771 	rply.rjcted_rply.rj_why = why;
772 
773 	if (xprt->xp_pool->sp_rcache)
774 		replay_setreply(xprt->xp_pool->sp_rcache,
775 		    &rply, svc_getrpccaller(rqstp), NULL);
776 
777 	svc_sendreply_common(rqstp, &rply, NULL);
778 }
779 
780 /*
781  * Auth too weak error reply
782  */
783 void
784 svcerr_weakauth(struct svc_req *rqstp)
785 {
786 
787 	svcerr_auth(rqstp, AUTH_TOOWEAK);
788 }
789 
790 /*
791  * Program unavailable error reply
792  */
793 void
794 svcerr_noprog(struct svc_req *rqstp)
795 {
796 	SVCXPRT *xprt = rqstp->rq_xprt;
797 	struct rpc_msg rply;
798 
799 	rply.rm_xid = rqstp->rq_xid;
800 	rply.rm_direction = REPLY;
801 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
802 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
803 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
804 
805 	if (xprt->xp_pool->sp_rcache)
806 		replay_setreply(xprt->xp_pool->sp_rcache,
807 		    &rply, svc_getrpccaller(rqstp), NULL);
808 
809 	svc_sendreply_common(rqstp, &rply, NULL);
810 }
811 
812 /*
813  * Program version mismatch error reply
814  */
815 void
816 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
817 {
818 	SVCXPRT *xprt = rqstp->rq_xprt;
819 	struct rpc_msg rply;
820 
821 	rply.rm_xid = rqstp->rq_xid;
822 	rply.rm_direction = REPLY;
823 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
824 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
825 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
826 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
827 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
828 
829 	if (xprt->xp_pool->sp_rcache)
830 		replay_setreply(xprt->xp_pool->sp_rcache,
831 		    &rply, svc_getrpccaller(rqstp), NULL);
832 
833 	svc_sendreply_common(rqstp, &rply, NULL);
834 }
835 
836 /*
837  * Allocate a new server transport structure. All fields are
838  * initialized to zero and xp_p3 is initialized to point at an
839  * extension structure to hold various flags and authentication
840  * parameters.
841  */
842 SVCXPRT *
843 svc_xprt_alloc()
844 {
845 	SVCXPRT *xprt;
846 	SVCXPRT_EXT *ext;
847 
848 	xprt = mem_alloc(sizeof(SVCXPRT));
849 	memset(xprt, 0, sizeof(SVCXPRT));
850 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
851 	memset(ext, 0, sizeof(SVCXPRT_EXT));
852 	xprt->xp_p3 = ext;
853 	refcount_init(&xprt->xp_refs, 1);
854 
855 	return (xprt);
856 }
857 
858 /*
859  * Free a server transport structure.
860  */
861 void
862 svc_xprt_free(xprt)
863 	SVCXPRT *xprt;
864 {
865 
866 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
867 	mem_free(xprt, sizeof(SVCXPRT));
868 }
869 
870 /* ******************* SERVER INPUT STUFF ******************* */
871 
872 /*
873  * Read RPC requests from a transport and queue them to be
874  * executed. We handle authentication and replay cache replies here.
875  * Actually dispatching the RPC is deferred till svc_executereq.
876  */
877 static enum xprt_stat
878 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
879 {
880 	SVCPOOL *pool = xprt->xp_pool;
881 	struct svc_req *r;
882 	struct rpc_msg msg;
883 	struct mbuf *args;
884 	struct svc_loss_callout *s;
885 	enum xprt_stat stat;
886 
887 	/* now receive msgs from xprtprt (support batch calls) */
888 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
889 
890 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
891 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
892 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
893 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
894 		enum auth_stat why;
895 
896 		/*
897 		 * Handle replays and authenticate before queuing the
898 		 * request to be executed.
899 		 */
900 		SVC_ACQUIRE(xprt);
901 		r->rq_xprt = xprt;
902 		if (pool->sp_rcache) {
903 			struct rpc_msg repmsg;
904 			struct mbuf *repbody;
905 			enum replay_state rs;
906 			rs = replay_find(pool->sp_rcache, &msg,
907 			    svc_getrpccaller(r), &repmsg, &repbody);
908 			switch (rs) {
909 			case RS_NEW:
910 				break;
911 			case RS_DONE:
912 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
913 				    repbody, &r->rq_reply_seq);
914 				if (r->rq_addr) {
915 					free(r->rq_addr, M_SONAME);
916 					r->rq_addr = NULL;
917 				}
918 				m_freem(args);
919 				goto call_done;
920 
921 			default:
922 				m_freem(args);
923 				goto call_done;
924 			}
925 		}
926 
927 		r->rq_xid = msg.rm_xid;
928 		r->rq_prog = msg.rm_call.cb_prog;
929 		r->rq_vers = msg.rm_call.cb_vers;
930 		r->rq_proc = msg.rm_call.cb_proc;
931 		r->rq_size = sizeof(*r) + m_length(args, NULL);
932 		r->rq_args = args;
933 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
934 			/*
935 			 * RPCSEC_GSS uses this return code
936 			 * for requests that form part of its
937 			 * context establishment protocol and
938 			 * should not be dispatched to the
939 			 * application.
940 			 */
941 			if (why != RPCSEC_GSS_NODISPATCH)
942 				svcerr_auth(r, why);
943 			goto call_done;
944 		}
945 
946 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
947 			svcerr_decode(r);
948 			goto call_done;
949 		}
950 
951 		/*
952 		 * Everything checks out, return request to caller.
953 		 */
954 		*rqstp_ret = r;
955 		r = NULL;
956 	}
957 call_done:
958 	if (r) {
959 		svc_freereq(r);
960 		r = NULL;
961 	}
962 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
963 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
964 			(*s->slc_dispatch)(xprt);
965 		xprt_unregister(xprt);
966 	}
967 
968 	return (stat);
969 }
970 
971 static void
972 svc_executereq(struct svc_req *rqstp)
973 {
974 	SVCXPRT *xprt = rqstp->rq_xprt;
975 	SVCPOOL *pool = xprt->xp_pool;
976 	int prog_found;
977 	rpcvers_t low_vers;
978 	rpcvers_t high_vers;
979 	struct svc_callout *s;
980 
981 	/* now match message with a registered service*/
982 	prog_found = FALSE;
983 	low_vers = (rpcvers_t) -1L;
984 	high_vers = (rpcvers_t) 0L;
985 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
986 		if (s->sc_prog == rqstp->rq_prog) {
987 			if (s->sc_vers == rqstp->rq_vers) {
988 				/*
989 				 * We hand ownership of r to the
990 				 * dispatch method - they must call
991 				 * svc_freereq.
992 				 */
993 				(*s->sc_dispatch)(rqstp, xprt);
994 				return;
995 			}  /* found correct version */
996 			prog_found = TRUE;
997 			if (s->sc_vers < low_vers)
998 				low_vers = s->sc_vers;
999 			if (s->sc_vers > high_vers)
1000 				high_vers = s->sc_vers;
1001 		}   /* found correct program */
1002 	}
1003 
1004 	/*
1005 	 * if we got here, the program or version
1006 	 * is not served ...
1007 	 */
1008 	if (prog_found)
1009 		svcerr_progvers(rqstp, low_vers, high_vers);
1010 	else
1011 		svcerr_noprog(rqstp);
1012 
1013 	svc_freereq(rqstp);
1014 }
1015 
1016 static void
1017 svc_checkidle(SVCGROUP *grp)
1018 {
1019 	SVCXPRT *xprt, *nxprt;
1020 	time_t timo;
1021 	struct svcxprt_list cleanup;
1022 
1023 	TAILQ_INIT(&cleanup);
1024 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1025 		/*
1026 		 * Only some transports have idle timers. Don't time
1027 		 * something out which is just waking up.
1028 		 */
1029 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1030 			continue;
1031 
1032 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1033 		if (time_uptime > timo) {
1034 			xprt_unregister_locked(xprt);
1035 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1036 		}
1037 	}
1038 
1039 	mtx_unlock(&grp->sg_lock);
1040 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1041 		SVC_RELEASE(xprt);
1042 	}
1043 	mtx_lock(&grp->sg_lock);
1044 }
1045 
1046 static void
1047 svc_assign_waiting_sockets(SVCPOOL *pool)
1048 {
1049 	SVCGROUP *grp;
1050 	SVCXPRT *xprt;
1051 	int g;
1052 
1053 	for (g = 0; g < pool->sp_groupcount; g++) {
1054 		grp = &pool->sp_groups[g];
1055 		mtx_lock(&grp->sg_lock);
1056 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1057 			if (xprt_assignthread(xprt))
1058 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1059 			else
1060 				break;
1061 		}
1062 		mtx_unlock(&grp->sg_lock);
1063 	}
1064 }
1065 
1066 static void
1067 svc_change_space_used(SVCPOOL *pool, int delta)
1068 {
1069 	unsigned int value;
1070 
1071 	value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
1072 	if (delta > 0) {
1073 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1074 			pool->sp_space_throttled = TRUE;
1075 			pool->sp_space_throttle_count++;
1076 		}
1077 		if (value > pool->sp_space_used_highest)
1078 			pool->sp_space_used_highest = value;
1079 	} else {
1080 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1081 			pool->sp_space_throttled = FALSE;
1082 			svc_assign_waiting_sockets(pool);
1083 		}
1084 	}
1085 }
1086 
1087 static bool_t
1088 svc_request_space_available(SVCPOOL *pool)
1089 {
1090 
1091 	if (pool->sp_space_throttled)
1092 		return (FALSE);
1093 	return (TRUE);
1094 }
1095 
1096 static void
1097 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1098 {
1099 	SVCPOOL *pool = grp->sg_pool;
1100 	SVCTHREAD *st, *stpref;
1101 	SVCXPRT *xprt;
1102 	enum xprt_stat stat;
1103 	struct svc_req *rqstp;
1104 	size_t sz;
1105 	int error;
1106 
1107 	st = mem_alloc(sizeof(*st));
1108 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1109 	st->st_pool = pool;
1110 	st->st_xprt = NULL;
1111 	STAILQ_INIT(&st->st_reqs);
1112 	cv_init(&st->st_cond, "rpcsvc");
1113 
1114 	mtx_lock(&grp->sg_lock);
1115 
1116 	/*
1117 	 * If we are a new thread which was spawned to cope with
1118 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1119 	 */
1120 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1121 		grp->sg_state = SVCPOOL_ACTIVE;
1122 
1123 	while (grp->sg_state != SVCPOOL_CLOSING) {
1124 		/*
1125 		 * Create new thread if requested.
1126 		 */
1127 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1128 			grp->sg_state = SVCPOOL_THREADSTARTING;
1129 			grp->sg_lastcreatetime = time_uptime;
1130 			mtx_unlock(&grp->sg_lock);
1131 			svc_new_thread(grp);
1132 			mtx_lock(&grp->sg_lock);
1133 			continue;
1134 		}
1135 
1136 		/*
1137 		 * Check for idle transports once per second.
1138 		 */
1139 		if (time_uptime > grp->sg_lastidlecheck) {
1140 			grp->sg_lastidlecheck = time_uptime;
1141 			svc_checkidle(grp);
1142 		}
1143 
1144 		xprt = st->st_xprt;
1145 		if (!xprt) {
1146 			/*
1147 			 * Enforce maxthreads count.
1148 			 */
1149 			if (grp->sg_threadcount > grp->sg_maxthreads)
1150 				break;
1151 
1152 			/*
1153 			 * Before sleeping, see if we can find an
1154 			 * active transport which isn't being serviced
1155 			 * by a thread.
1156 			 */
1157 			if (svc_request_space_available(pool) &&
1158 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1159 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1160 				SVC_ACQUIRE(xprt);
1161 				xprt->xp_thread = st;
1162 				st->st_xprt = xprt;
1163 				continue;
1164 			}
1165 
1166 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1167 			if (ismaster || (!ismaster &&
1168 			    grp->sg_threadcount > grp->sg_minthreads))
1169 				error = cv_timedwait_sig(&st->st_cond,
1170 				    &grp->sg_lock, 5 * hz);
1171 			else
1172 				error = cv_wait_sig(&st->st_cond,
1173 				    &grp->sg_lock);
1174 			if (st->st_xprt == NULL)
1175 				LIST_REMOVE(st, st_ilink);
1176 
1177 			/*
1178 			 * Reduce worker thread count when idle.
1179 			 */
1180 			if (error == EWOULDBLOCK) {
1181 				if (!ismaster
1182 				    && (grp->sg_threadcount
1183 					> grp->sg_minthreads)
1184 					&& !st->st_xprt)
1185 					break;
1186 			} else if (error) {
1187 				mtx_unlock(&grp->sg_lock);
1188 				svc_exit(pool);
1189 				mtx_lock(&grp->sg_lock);
1190 				break;
1191 			}
1192 			continue;
1193 		}
1194 		mtx_unlock(&grp->sg_lock);
1195 
1196 		/*
1197 		 * Drain the transport socket and queue up any RPCs.
1198 		 */
1199 		xprt->xp_lastactive = time_uptime;
1200 		do {
1201 			if (!svc_request_space_available(pool))
1202 				break;
1203 			rqstp = NULL;
1204 			stat = svc_getreq(xprt, &rqstp);
1205 			if (rqstp) {
1206 				svc_change_space_used(pool, rqstp->rq_size);
1207 				/*
1208 				 * See if the application has a preference
1209 				 * for some other thread.
1210 				 */
1211 				if (pool->sp_assign) {
1212 					stpref = pool->sp_assign(st, rqstp);
1213 					rqstp->rq_thread = stpref;
1214 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1215 					    rqstp, rq_link);
1216 					mtx_unlock(&stpref->st_lock);
1217 					if (stpref != st)
1218 						rqstp = NULL;
1219 				} else {
1220 					rqstp->rq_thread = st;
1221 					STAILQ_INSERT_TAIL(&st->st_reqs,
1222 					    rqstp, rq_link);
1223 				}
1224 			}
1225 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1226 		    && grp->sg_state != SVCPOOL_CLOSING);
1227 
1228 		/*
1229 		 * Move this transport to the end of the active list to
1230 		 * ensure fairness when multiple transports are active.
1231 		 * If this was the last queued request, svc_getreq will end
1232 		 * up calling xprt_inactive to remove from the active list.
1233 		 */
1234 		mtx_lock(&grp->sg_lock);
1235 		xprt->xp_thread = NULL;
1236 		st->st_xprt = NULL;
1237 		if (xprt->xp_active) {
1238 			if (!svc_request_space_available(pool) ||
1239 			    !xprt_assignthread(xprt))
1240 				TAILQ_INSERT_TAIL(&grp->sg_active,
1241 				    xprt, xp_alink);
1242 		}
1243 		mtx_unlock(&grp->sg_lock);
1244 		SVC_RELEASE(xprt);
1245 
1246 		/*
1247 		 * Execute what we have queued.
1248 		 */
1249 		sz = 0;
1250 		mtx_lock(&st->st_lock);
1251 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1252 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1253 			mtx_unlock(&st->st_lock);
1254 			sz += rqstp->rq_size;
1255 			svc_executereq(rqstp);
1256 			mtx_lock(&st->st_lock);
1257 		}
1258 		mtx_unlock(&st->st_lock);
1259 		svc_change_space_used(pool, -sz);
1260 		mtx_lock(&grp->sg_lock);
1261 	}
1262 
1263 	if (st->st_xprt) {
1264 		xprt = st->st_xprt;
1265 		st->st_xprt = NULL;
1266 		SVC_RELEASE(xprt);
1267 	}
1268 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1269 	mtx_destroy(&st->st_lock);
1270 	cv_destroy(&st->st_cond);
1271 	mem_free(st, sizeof(*st));
1272 
1273 	grp->sg_threadcount--;
1274 	if (!ismaster)
1275 		wakeup(grp);
1276 	mtx_unlock(&grp->sg_lock);
1277 }
1278 
1279 static void
1280 svc_thread_start(void *arg)
1281 {
1282 
1283 	svc_run_internal((SVCGROUP *) arg, FALSE);
1284 	kthread_exit();
1285 }
1286 
1287 static void
1288 svc_new_thread(SVCGROUP *grp)
1289 {
1290 	SVCPOOL *pool = grp->sg_pool;
1291 	struct thread *td;
1292 
1293 	grp->sg_threadcount++;
1294 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1295 	    "%s: service", pool->sp_name);
1296 }
1297 
1298 void
1299 svc_run(SVCPOOL *pool)
1300 {
1301 	int g, i;
1302 	struct proc *p;
1303 	struct thread *td;
1304 	SVCGROUP *grp;
1305 
1306 	p = curproc;
1307 	td = curthread;
1308 	snprintf(td->td_name, sizeof(td->td_name),
1309 	    "%s: master", pool->sp_name);
1310 	pool->sp_state = SVCPOOL_ACTIVE;
1311 	pool->sp_proc = p;
1312 
1313 	/* Choose group count based on number of threads and CPUs. */
1314 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1315 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1316 	for (g = 0; g < pool->sp_groupcount; g++) {
1317 		grp = &pool->sp_groups[g];
1318 		grp->sg_minthreads = max(1,
1319 		    pool->sp_minthreads / pool->sp_groupcount);
1320 		grp->sg_maxthreads = max(1,
1321 		    pool->sp_maxthreads / pool->sp_groupcount);
1322 		grp->sg_lastcreatetime = time_uptime;
1323 	}
1324 
1325 	/* Starting threads */
1326 	for (g = 0; g < pool->sp_groupcount; g++) {
1327 		grp = &pool->sp_groups[g];
1328 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1329 			svc_new_thread(grp);
1330 	}
1331 	pool->sp_groups[0].sg_threadcount++;
1332 	svc_run_internal(&pool->sp_groups[0], TRUE);
1333 
1334 	/* Waiting for threads to stop. */
1335 	for (g = 0; g < pool->sp_groupcount; g++) {
1336 		grp = &pool->sp_groups[g];
1337 		mtx_lock(&grp->sg_lock);
1338 		while (grp->sg_threadcount > 0)
1339 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1340 		mtx_unlock(&grp->sg_lock);
1341 	}
1342 }
1343 
1344 void
1345 svc_exit(SVCPOOL *pool)
1346 {
1347 	SVCGROUP *grp;
1348 	SVCTHREAD *st;
1349 	int g;
1350 
1351 	pool->sp_state = SVCPOOL_CLOSING;
1352 	for (g = 0; g < pool->sp_groupcount; g++) {
1353 		grp = &pool->sp_groups[g];
1354 		mtx_lock(&grp->sg_lock);
1355 		if (grp->sg_state != SVCPOOL_CLOSING) {
1356 			grp->sg_state = SVCPOOL_CLOSING;
1357 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1358 				cv_signal(&st->st_cond);
1359 		}
1360 		mtx_unlock(&grp->sg_lock);
1361 	}
1362 }
1363 
1364 bool_t
1365 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1366 {
1367 	struct mbuf *m;
1368 	XDR xdrs;
1369 	bool_t stat;
1370 
1371 	m = rqstp->rq_args;
1372 	rqstp->rq_args = NULL;
1373 
1374 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1375 	stat = xargs(&xdrs, args);
1376 	XDR_DESTROY(&xdrs);
1377 
1378 	return (stat);
1379 }
1380 
1381 bool_t
1382 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1383 {
1384 	XDR xdrs;
1385 
1386 	if (rqstp->rq_addr) {
1387 		free(rqstp->rq_addr, M_SONAME);
1388 		rqstp->rq_addr = NULL;
1389 	}
1390 
1391 	xdrs.x_op = XDR_FREE;
1392 	return (xargs(&xdrs, args));
1393 }
1394 
1395 void
1396 svc_freereq(struct svc_req *rqstp)
1397 {
1398 	SVCTHREAD *st;
1399 	SVCPOOL *pool;
1400 
1401 	st = rqstp->rq_thread;
1402 	if (st) {
1403 		pool = st->st_pool;
1404 		if (pool->sp_done)
1405 			pool->sp_done(st, rqstp);
1406 	}
1407 
1408 	if (rqstp->rq_auth.svc_ah_ops)
1409 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1410 
1411 	if (rqstp->rq_xprt) {
1412 		SVC_RELEASE(rqstp->rq_xprt);
1413 	}
1414 
1415 	if (rqstp->rq_addr)
1416 		free(rqstp->rq_addr, M_SONAME);
1417 
1418 	if (rqstp->rq_args)
1419 		m_freem(rqstp->rq_args);
1420 
1421 	free(rqstp, M_RPC);
1422 }
1423