xref: /freebsd/sys/rpc/svc.c (revision c6a33c8e88c5684876e670c8189d03ad25108d8a)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice,
12  *   this list of conditions and the following disclaimer in the documentation
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37 
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62 
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66 
67 #include <rpc/rpc_com.h>
68 
69 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73     char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 
79 /* ***************  SVCXPRT related stuff **************** */
80 
81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84 
85 SVCPOOL*
86 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
87 {
88 	SVCPOOL *pool;
89 	SVCGROUP *grp;
90 	int g;
91 
92 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
93 
94 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95 	pool->sp_name = name;
96 	pool->sp_state = SVCPOOL_INIT;
97 	pool->sp_proc = NULL;
98 	TAILQ_INIT(&pool->sp_callouts);
99 	TAILQ_INIT(&pool->sp_lcallouts);
100 	pool->sp_minthreads = 1;
101 	pool->sp_maxthreads = 1;
102 	pool->sp_groupcount = 1;
103 	for (g = 0; g < SVC_MAXGROUPS; g++) {
104 		grp = &pool->sp_groups[g];
105 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
106 		grp->sg_pool = pool;
107 		grp->sg_state = SVCPOOL_ACTIVE;
108 		TAILQ_INIT(&grp->sg_xlist);
109 		TAILQ_INIT(&grp->sg_active);
110 		LIST_INIT(&grp->sg_idlethreads);
111 		grp->sg_minthreads = 1;
112 		grp->sg_maxthreads = 1;
113 	}
114 
115 	/*
116 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
117 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
118 	 * on LP64 architectures, so cast to u_long to avoid undefined
119 	 * behavior.  (ILP32 architectures cannot have nmbclusters
120 	 * large enough to overflow for other reasons.)
121 	 */
122 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
123 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
124 
125 	sysctl_ctx_init(&pool->sp_sysctl);
126 	if (sysctl_base) {
127 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
128 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
129 		    pool, 0, svcpool_minthread_sysctl, "I",
130 		    "Minimal number of threads");
131 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
133 		    pool, 0, svcpool_maxthread_sysctl, "I",
134 		    "Maximal number of threads");
135 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 		    "threads", CTLTYPE_INT | CTLFLAG_RD,
137 		    pool, 0, svcpool_threads_sysctl, "I",
138 		    "Current number of threads");
139 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
141 		    "Number of thread groups");
142 
143 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144 		    "request_space_used", CTLFLAG_RD,
145 		    &pool->sp_space_used,
146 		    "Space in parsed but not handled requests.");
147 
148 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
149 		    "request_space_used_highest", CTLFLAG_RD,
150 		    &pool->sp_space_used_highest,
151 		    "Highest space used since reboot.");
152 
153 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
154 		    "request_space_high", CTLFLAG_RW,
155 		    &pool->sp_space_high,
156 		    "Maximum space in parsed but not handled requests.");
157 
158 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
159 		    "request_space_low", CTLFLAG_RW,
160 		    &pool->sp_space_low,
161 		    "Low water mark for request space.");
162 
163 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
164 		    "request_space_throttled", CTLFLAG_RD,
165 		    &pool->sp_space_throttled, 0,
166 		    "Whether nfs requests are currently throttled");
167 
168 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
169 		    "request_space_throttle_count", CTLFLAG_RD,
170 		    &pool->sp_space_throttle_count, 0,
171 		    "Count of times throttling based on request space has occurred");
172 	}
173 
174 	return pool;
175 }
176 
177 void
178 svcpool_destroy(SVCPOOL *pool)
179 {
180 	SVCGROUP *grp;
181 	SVCXPRT *xprt, *nxprt;
182 	struct svc_callout *s;
183 	struct svc_loss_callout *sl;
184 	struct svcxprt_list cleanup;
185 	int g;
186 
187 	TAILQ_INIT(&cleanup);
188 
189 	for (g = 0; g < SVC_MAXGROUPS; g++) {
190 		grp = &pool->sp_groups[g];
191 		mtx_lock(&grp->sg_lock);
192 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
193 			xprt_unregister_locked(xprt);
194 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
195 		}
196 		mtx_unlock(&grp->sg_lock);
197 	}
198 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
199 		SVC_RELEASE(xprt);
200 	}
201 
202 	mtx_lock(&pool->sp_lock);
203 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
204 		mtx_unlock(&pool->sp_lock);
205 		svc_unreg(pool, s->sc_prog, s->sc_vers);
206 		mtx_lock(&pool->sp_lock);
207 	}
208 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
209 		mtx_unlock(&pool->sp_lock);
210 		svc_loss_unreg(pool, sl->slc_dispatch);
211 		mtx_lock(&pool->sp_lock);
212 	}
213 	mtx_unlock(&pool->sp_lock);
214 
215 	for (g = 0; g < SVC_MAXGROUPS; g++) {
216 		grp = &pool->sp_groups[g];
217 		mtx_destroy(&grp->sg_lock);
218 	}
219 	mtx_destroy(&pool->sp_lock);
220 
221 	if (pool->sp_rcache)
222 		replay_freecache(pool->sp_rcache);
223 
224 	sysctl_ctx_free(&pool->sp_sysctl);
225 	free(pool, M_RPC);
226 }
227 
228 /*
229  * Sysctl handler to get the present thread count on a pool
230  */
231 static int
232 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
233 {
234 	SVCPOOL *pool;
235 	int threads, error, g;
236 
237 	pool = oidp->oid_arg1;
238 	threads = 0;
239 	mtx_lock(&pool->sp_lock);
240 	for (g = 0; g < pool->sp_groupcount; g++)
241 		threads += pool->sp_groups[g].sg_threadcount;
242 	mtx_unlock(&pool->sp_lock);
243 	error = sysctl_handle_int(oidp, &threads, 0, req);
244 	return (error);
245 }
246 
247 /*
248  * Sysctl handler to set the minimum thread count on a pool
249  */
250 static int
251 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
252 {
253 	SVCPOOL *pool;
254 	int newminthreads, error, g;
255 
256 	pool = oidp->oid_arg1;
257 	newminthreads = pool->sp_minthreads;
258 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
259 	if (error == 0 && newminthreads != pool->sp_minthreads) {
260 		if (newminthreads > pool->sp_maxthreads)
261 			return (EINVAL);
262 		mtx_lock(&pool->sp_lock);
263 		pool->sp_minthreads = newminthreads;
264 		for (g = 0; g < pool->sp_groupcount; g++) {
265 			pool->sp_groups[g].sg_minthreads = max(1,
266 			    pool->sp_minthreads / pool->sp_groupcount);
267 		}
268 		mtx_unlock(&pool->sp_lock);
269 	}
270 	return (error);
271 }
272 
273 /*
274  * Sysctl handler to set the maximum thread count on a pool
275  */
276 static int
277 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
278 {
279 	SVCPOOL *pool;
280 	int newmaxthreads, error, g;
281 
282 	pool = oidp->oid_arg1;
283 	newmaxthreads = pool->sp_maxthreads;
284 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
285 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
286 		if (newmaxthreads < pool->sp_minthreads)
287 			return (EINVAL);
288 		mtx_lock(&pool->sp_lock);
289 		pool->sp_maxthreads = newmaxthreads;
290 		for (g = 0; g < pool->sp_groupcount; g++) {
291 			pool->sp_groups[g].sg_maxthreads = max(1,
292 			    pool->sp_maxthreads / pool->sp_groupcount);
293 		}
294 		mtx_unlock(&pool->sp_lock);
295 	}
296 	return (error);
297 }
298 
299 /*
300  * Activate a transport handle.
301  */
302 void
303 xprt_register(SVCXPRT *xprt)
304 {
305 	SVCPOOL *pool = xprt->xp_pool;
306 	SVCGROUP *grp;
307 	int g;
308 
309 	SVC_ACQUIRE(xprt);
310 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
311 	xprt->xp_group = grp = &pool->sp_groups[g];
312 	mtx_lock(&grp->sg_lock);
313 	xprt->xp_registered = TRUE;
314 	xprt->xp_active = FALSE;
315 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
316 	mtx_unlock(&grp->sg_lock);
317 }
318 
319 /*
320  * De-activate a transport handle. Note: the locked version doesn't
321  * release the transport - caller must do that after dropping the pool
322  * lock.
323  */
324 static void
325 xprt_unregister_locked(SVCXPRT *xprt)
326 {
327 	SVCGROUP *grp = xprt->xp_group;
328 
329 	mtx_assert(&grp->sg_lock, MA_OWNED);
330 	KASSERT(xprt->xp_registered == TRUE,
331 	    ("xprt_unregister_locked: not registered"));
332 	xprt_inactive_locked(xprt);
333 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
334 	xprt->xp_registered = FALSE;
335 }
336 
337 void
338 xprt_unregister(SVCXPRT *xprt)
339 {
340 	SVCGROUP *grp = xprt->xp_group;
341 
342 	mtx_lock(&grp->sg_lock);
343 	if (xprt->xp_registered == FALSE) {
344 		/* Already unregistered by another thread */
345 		mtx_unlock(&grp->sg_lock);
346 		return;
347 	}
348 	xprt_unregister_locked(xprt);
349 	mtx_unlock(&grp->sg_lock);
350 
351 	SVC_RELEASE(xprt);
352 }
353 
354 /*
355  * Attempt to assign a service thread to this transport.
356  */
357 static int
358 xprt_assignthread(SVCXPRT *xprt)
359 {
360 	SVCGROUP *grp = xprt->xp_group;
361 	SVCTHREAD *st;
362 
363 	mtx_assert(&grp->sg_lock, MA_OWNED);
364 	st = LIST_FIRST(&grp->sg_idlethreads);
365 	if (st) {
366 		LIST_REMOVE(st, st_ilink);
367 		SVC_ACQUIRE(xprt);
368 		xprt->xp_thread = st;
369 		st->st_xprt = xprt;
370 		cv_signal(&st->st_cond);
371 		return (TRUE);
372 	} else {
373 		/*
374 		 * See if we can create a new thread. The
375 		 * actual thread creation happens in
376 		 * svc_run_internal because our locking state
377 		 * is poorly defined (we are typically called
378 		 * from a socket upcall). Don't create more
379 		 * than one thread per second.
380 		 */
381 		if (grp->sg_state == SVCPOOL_ACTIVE
382 		    && grp->sg_lastcreatetime < time_uptime
383 		    && grp->sg_threadcount < grp->sg_maxthreads) {
384 			grp->sg_state = SVCPOOL_THREADWANTED;
385 		}
386 	}
387 	return (FALSE);
388 }
389 
390 void
391 xprt_active(SVCXPRT *xprt)
392 {
393 	SVCGROUP *grp = xprt->xp_group;
394 
395 	mtx_lock(&grp->sg_lock);
396 
397 	if (!xprt->xp_registered) {
398 		/*
399 		 * Race with xprt_unregister - we lose.
400 		 */
401 		mtx_unlock(&grp->sg_lock);
402 		return;
403 	}
404 
405 	if (!xprt->xp_active) {
406 		xprt->xp_active = TRUE;
407 		if (xprt->xp_thread == NULL) {
408 			if (!svc_request_space_available(xprt->xp_pool) ||
409 			    !xprt_assignthread(xprt))
410 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
411 				    xp_alink);
412 		}
413 	}
414 
415 	mtx_unlock(&grp->sg_lock);
416 }
417 
418 void
419 xprt_inactive_locked(SVCXPRT *xprt)
420 {
421 	SVCGROUP *grp = xprt->xp_group;
422 
423 	mtx_assert(&grp->sg_lock, MA_OWNED);
424 	if (xprt->xp_active) {
425 		if (xprt->xp_thread == NULL)
426 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
427 		xprt->xp_active = FALSE;
428 	}
429 }
430 
431 void
432 xprt_inactive(SVCXPRT *xprt)
433 {
434 	SVCGROUP *grp = xprt->xp_group;
435 
436 	mtx_lock(&grp->sg_lock);
437 	xprt_inactive_locked(xprt);
438 	mtx_unlock(&grp->sg_lock);
439 }
440 
441 /*
442  * Variant of xprt_inactive() for use only when sure that port is
443  * assigned to thread. For example, withing receive handlers.
444  */
445 void
446 xprt_inactive_self(SVCXPRT *xprt)
447 {
448 
449 	KASSERT(xprt->xp_thread != NULL,
450 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
451 	xprt->xp_active = FALSE;
452 }
453 
454 /*
455  * Add a service program to the callout list.
456  * The dispatch routine will be called when a rpc request for this
457  * program number comes in.
458  */
459 bool_t
460 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
461     void (*dispatch)(struct svc_req *, SVCXPRT *),
462     const struct netconfig *nconf)
463 {
464 	SVCPOOL *pool = xprt->xp_pool;
465 	struct svc_callout *s;
466 	char *netid = NULL;
467 	int flag = 0;
468 
469 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
470 
471 	if (xprt->xp_netid) {
472 		netid = strdup(xprt->xp_netid, M_RPC);
473 		flag = 1;
474 	} else if (nconf && nconf->nc_netid) {
475 		netid = strdup(nconf->nc_netid, M_RPC);
476 		flag = 1;
477 	} /* must have been created with svc_raw_create */
478 	if ((netid == NULL) && (flag == 1)) {
479 		return (FALSE);
480 	}
481 
482 	mtx_lock(&pool->sp_lock);
483 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
484 		if (netid)
485 			free(netid, M_RPC);
486 		if (s->sc_dispatch == dispatch)
487 			goto rpcb_it; /* he is registering another xptr */
488 		mtx_unlock(&pool->sp_lock);
489 		return (FALSE);
490 	}
491 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
492 	if (s == NULL) {
493 		if (netid)
494 			free(netid, M_RPC);
495 		mtx_unlock(&pool->sp_lock);
496 		return (FALSE);
497 	}
498 
499 	s->sc_prog = prog;
500 	s->sc_vers = vers;
501 	s->sc_dispatch = dispatch;
502 	s->sc_netid = netid;
503 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
504 
505 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
506 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
507 
508 rpcb_it:
509 	mtx_unlock(&pool->sp_lock);
510 	/* now register the information with the local binder service */
511 	if (nconf) {
512 		bool_t dummy;
513 		struct netconfig tnc;
514 		struct netbuf nb;
515 		tnc = *nconf;
516 		nb.buf = &xprt->xp_ltaddr;
517 		nb.len = xprt->xp_ltaddr.ss_len;
518 		dummy = rpcb_set(prog, vers, &tnc, &nb);
519 		return (dummy);
520 	}
521 	return (TRUE);
522 }
523 
524 /*
525  * Remove a service program from the callout list.
526  */
527 void
528 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
529 {
530 	struct svc_callout *s;
531 
532 	/* unregister the information anyway */
533 	(void) rpcb_unset(prog, vers, NULL);
534 	mtx_lock(&pool->sp_lock);
535 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
536 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
537 		if (s->sc_netid)
538 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
539 		mem_free(s, sizeof (struct svc_callout));
540 	}
541 	mtx_unlock(&pool->sp_lock);
542 }
543 
544 /*
545  * Add a service connection loss program to the callout list.
546  * The dispatch routine will be called when some port in ths pool die.
547  */
548 bool_t
549 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
550 {
551 	SVCPOOL *pool = xprt->xp_pool;
552 	struct svc_loss_callout *s;
553 
554 	mtx_lock(&pool->sp_lock);
555 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
556 		if (s->slc_dispatch == dispatch)
557 			break;
558 	}
559 	if (s != NULL) {
560 		mtx_unlock(&pool->sp_lock);
561 		return (TRUE);
562 	}
563 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
564 	if (s == NULL) {
565 		mtx_unlock(&pool->sp_lock);
566 		return (FALSE);
567 	}
568 	s->slc_dispatch = dispatch;
569 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
570 	mtx_unlock(&pool->sp_lock);
571 	return (TRUE);
572 }
573 
574 /*
575  * Remove a service connection loss program from the callout list.
576  */
577 void
578 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
579 {
580 	struct svc_loss_callout *s;
581 
582 	mtx_lock(&pool->sp_lock);
583 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
584 		if (s->slc_dispatch == dispatch) {
585 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
586 			free(s, M_RPC);
587 			break;
588 		}
589 	}
590 	mtx_unlock(&pool->sp_lock);
591 }
592 
593 /* ********************** CALLOUT list related stuff ************* */
594 
595 /*
596  * Search the callout list for a program number, return the callout
597  * struct.
598  */
599 static struct svc_callout *
600 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
601 {
602 	struct svc_callout *s;
603 
604 	mtx_assert(&pool->sp_lock, MA_OWNED);
605 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
606 		if (s->sc_prog == prog && s->sc_vers == vers
607 		    && (netid == NULL || s->sc_netid == NULL ||
608 			strcmp(netid, s->sc_netid) == 0))
609 			break;
610 	}
611 
612 	return (s);
613 }
614 
615 /* ******************* REPLY GENERATION ROUTINES  ************ */
616 
617 static bool_t
618 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
619     struct mbuf *body)
620 {
621 	SVCXPRT *xprt = rqstp->rq_xprt;
622 	bool_t ok;
623 
624 	if (rqstp->rq_args) {
625 		m_freem(rqstp->rq_args);
626 		rqstp->rq_args = NULL;
627 	}
628 
629 	if (xprt->xp_pool->sp_rcache)
630 		replay_setreply(xprt->xp_pool->sp_rcache,
631 		    rply, svc_getrpccaller(rqstp), body);
632 
633 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
634 		return (FALSE);
635 
636 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
637 	if (rqstp->rq_addr) {
638 		free(rqstp->rq_addr, M_SONAME);
639 		rqstp->rq_addr = NULL;
640 	}
641 
642 	return (ok);
643 }
644 
645 /*
646  * Send a reply to an rpc request
647  */
648 bool_t
649 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
650 {
651 	struct rpc_msg rply;
652 	struct mbuf *m;
653 	XDR xdrs;
654 	bool_t ok;
655 
656 	rply.rm_xid = rqstp->rq_xid;
657 	rply.rm_direction = REPLY;
658 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
659 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
660 	rply.acpted_rply.ar_stat = SUCCESS;
661 	rply.acpted_rply.ar_results.where = NULL;
662 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
663 
664 	m = m_getcl(M_WAITOK, MT_DATA, 0);
665 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
666 	ok = xdr_results(&xdrs, xdr_location);
667 	XDR_DESTROY(&xdrs);
668 
669 	if (ok) {
670 		return (svc_sendreply_common(rqstp, &rply, m));
671 	} else {
672 		m_freem(m);
673 		return (FALSE);
674 	}
675 }
676 
677 bool_t
678 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
679 {
680 	struct rpc_msg rply;
681 
682 	rply.rm_xid = rqstp->rq_xid;
683 	rply.rm_direction = REPLY;
684 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
685 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
686 	rply.acpted_rply.ar_stat = SUCCESS;
687 	rply.acpted_rply.ar_results.where = NULL;
688 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
689 
690 	return (svc_sendreply_common(rqstp, &rply, m));
691 }
692 
693 /*
694  * No procedure error reply
695  */
696 void
697 svcerr_noproc(struct svc_req *rqstp)
698 {
699 	SVCXPRT *xprt = rqstp->rq_xprt;
700 	struct rpc_msg rply;
701 
702 	rply.rm_xid = rqstp->rq_xid;
703 	rply.rm_direction = REPLY;
704 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
705 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
706 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
707 
708 	if (xprt->xp_pool->sp_rcache)
709 		replay_setreply(xprt->xp_pool->sp_rcache,
710 		    &rply, svc_getrpccaller(rqstp), NULL);
711 
712 	svc_sendreply_common(rqstp, &rply, NULL);
713 }
714 
715 /*
716  * Can't decode args error reply
717  */
718 void
719 svcerr_decode(struct svc_req *rqstp)
720 {
721 	SVCXPRT *xprt = rqstp->rq_xprt;
722 	struct rpc_msg rply;
723 
724 	rply.rm_xid = rqstp->rq_xid;
725 	rply.rm_direction = REPLY;
726 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
727 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
728 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
729 
730 	if (xprt->xp_pool->sp_rcache)
731 		replay_setreply(xprt->xp_pool->sp_rcache,
732 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
733 
734 	svc_sendreply_common(rqstp, &rply, NULL);
735 }
736 
737 /*
738  * Some system error
739  */
740 void
741 svcerr_systemerr(struct svc_req *rqstp)
742 {
743 	SVCXPRT *xprt = rqstp->rq_xprt;
744 	struct rpc_msg rply;
745 
746 	rply.rm_xid = rqstp->rq_xid;
747 	rply.rm_direction = REPLY;
748 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
749 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
750 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
751 
752 	if (xprt->xp_pool->sp_rcache)
753 		replay_setreply(xprt->xp_pool->sp_rcache,
754 		    &rply, svc_getrpccaller(rqstp), NULL);
755 
756 	svc_sendreply_common(rqstp, &rply, NULL);
757 }
758 
759 /*
760  * Authentication error reply
761  */
762 void
763 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
764 {
765 	SVCXPRT *xprt = rqstp->rq_xprt;
766 	struct rpc_msg rply;
767 
768 	rply.rm_xid = rqstp->rq_xid;
769 	rply.rm_direction = REPLY;
770 	rply.rm_reply.rp_stat = MSG_DENIED;
771 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
772 	rply.rjcted_rply.rj_why = why;
773 
774 	if (xprt->xp_pool->sp_rcache)
775 		replay_setreply(xprt->xp_pool->sp_rcache,
776 		    &rply, svc_getrpccaller(rqstp), NULL);
777 
778 	svc_sendreply_common(rqstp, &rply, NULL);
779 }
780 
781 /*
782  * Auth too weak error reply
783  */
784 void
785 svcerr_weakauth(struct svc_req *rqstp)
786 {
787 
788 	svcerr_auth(rqstp, AUTH_TOOWEAK);
789 }
790 
791 /*
792  * Program unavailable error reply
793  */
794 void
795 svcerr_noprog(struct svc_req *rqstp)
796 {
797 	SVCXPRT *xprt = rqstp->rq_xprt;
798 	struct rpc_msg rply;
799 
800 	rply.rm_xid = rqstp->rq_xid;
801 	rply.rm_direction = REPLY;
802 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
803 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
804 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
805 
806 	if (xprt->xp_pool->sp_rcache)
807 		replay_setreply(xprt->xp_pool->sp_rcache,
808 		    &rply, svc_getrpccaller(rqstp), NULL);
809 
810 	svc_sendreply_common(rqstp, &rply, NULL);
811 }
812 
813 /*
814  * Program version mismatch error reply
815  */
816 void
817 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
818 {
819 	SVCXPRT *xprt = rqstp->rq_xprt;
820 	struct rpc_msg rply;
821 
822 	rply.rm_xid = rqstp->rq_xid;
823 	rply.rm_direction = REPLY;
824 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
825 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
826 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
827 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
828 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
829 
830 	if (xprt->xp_pool->sp_rcache)
831 		replay_setreply(xprt->xp_pool->sp_rcache,
832 		    &rply, svc_getrpccaller(rqstp), NULL);
833 
834 	svc_sendreply_common(rqstp, &rply, NULL);
835 }
836 
837 /*
838  * Allocate a new server transport structure. All fields are
839  * initialized to zero and xp_p3 is initialized to point at an
840  * extension structure to hold various flags and authentication
841  * parameters.
842  */
843 SVCXPRT *
844 svc_xprt_alloc()
845 {
846 	SVCXPRT *xprt;
847 	SVCXPRT_EXT *ext;
848 
849 	xprt = mem_alloc(sizeof(SVCXPRT));
850 	memset(xprt, 0, sizeof(SVCXPRT));
851 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
852 	memset(ext, 0, sizeof(SVCXPRT_EXT));
853 	xprt->xp_p3 = ext;
854 	refcount_init(&xprt->xp_refs, 1);
855 
856 	return (xprt);
857 }
858 
859 /*
860  * Free a server transport structure.
861  */
862 void
863 svc_xprt_free(xprt)
864 	SVCXPRT *xprt;
865 {
866 
867 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
868 	mem_free(xprt, sizeof(SVCXPRT));
869 }
870 
871 /* ******************* SERVER INPUT STUFF ******************* */
872 
873 /*
874  * Read RPC requests from a transport and queue them to be
875  * executed. We handle authentication and replay cache replies here.
876  * Actually dispatching the RPC is deferred till svc_executereq.
877  */
878 static enum xprt_stat
879 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
880 {
881 	SVCPOOL *pool = xprt->xp_pool;
882 	struct svc_req *r;
883 	struct rpc_msg msg;
884 	struct mbuf *args;
885 	struct svc_loss_callout *s;
886 	enum xprt_stat stat;
887 
888 	/* now receive msgs from xprtprt (support batch calls) */
889 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
890 
891 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
892 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
893 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
894 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
895 		enum auth_stat why;
896 
897 		/*
898 		 * Handle replays and authenticate before queuing the
899 		 * request to be executed.
900 		 */
901 		SVC_ACQUIRE(xprt);
902 		r->rq_xprt = xprt;
903 		if (pool->sp_rcache) {
904 			struct rpc_msg repmsg;
905 			struct mbuf *repbody;
906 			enum replay_state rs;
907 			rs = replay_find(pool->sp_rcache, &msg,
908 			    svc_getrpccaller(r), &repmsg, &repbody);
909 			switch (rs) {
910 			case RS_NEW:
911 				break;
912 			case RS_DONE:
913 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
914 				    repbody, &r->rq_reply_seq);
915 				if (r->rq_addr) {
916 					free(r->rq_addr, M_SONAME);
917 					r->rq_addr = NULL;
918 				}
919 				m_freem(args);
920 				goto call_done;
921 
922 			default:
923 				m_freem(args);
924 				goto call_done;
925 			}
926 		}
927 
928 		r->rq_xid = msg.rm_xid;
929 		r->rq_prog = msg.rm_call.cb_prog;
930 		r->rq_vers = msg.rm_call.cb_vers;
931 		r->rq_proc = msg.rm_call.cb_proc;
932 		r->rq_size = sizeof(*r) + m_length(args, NULL);
933 		r->rq_args = args;
934 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
935 			/*
936 			 * RPCSEC_GSS uses this return code
937 			 * for requests that form part of its
938 			 * context establishment protocol and
939 			 * should not be dispatched to the
940 			 * application.
941 			 */
942 			if (why != RPCSEC_GSS_NODISPATCH)
943 				svcerr_auth(r, why);
944 			goto call_done;
945 		}
946 
947 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
948 			svcerr_decode(r);
949 			goto call_done;
950 		}
951 
952 		/*
953 		 * Everything checks out, return request to caller.
954 		 */
955 		*rqstp_ret = r;
956 		r = NULL;
957 	}
958 call_done:
959 	if (r) {
960 		svc_freereq(r);
961 		r = NULL;
962 	}
963 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
964 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
965 			(*s->slc_dispatch)(xprt);
966 		xprt_unregister(xprt);
967 	}
968 
969 	return (stat);
970 }
971 
972 static void
973 svc_executereq(struct svc_req *rqstp)
974 {
975 	SVCXPRT *xprt = rqstp->rq_xprt;
976 	SVCPOOL *pool = xprt->xp_pool;
977 	int prog_found;
978 	rpcvers_t low_vers;
979 	rpcvers_t high_vers;
980 	struct svc_callout *s;
981 
982 	/* now match message with a registered service*/
983 	prog_found = FALSE;
984 	low_vers = (rpcvers_t) -1L;
985 	high_vers = (rpcvers_t) 0L;
986 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
987 		if (s->sc_prog == rqstp->rq_prog) {
988 			if (s->sc_vers == rqstp->rq_vers) {
989 				/*
990 				 * We hand ownership of r to the
991 				 * dispatch method - they must call
992 				 * svc_freereq.
993 				 */
994 				(*s->sc_dispatch)(rqstp, xprt);
995 				return;
996 			}  /* found correct version */
997 			prog_found = TRUE;
998 			if (s->sc_vers < low_vers)
999 				low_vers = s->sc_vers;
1000 			if (s->sc_vers > high_vers)
1001 				high_vers = s->sc_vers;
1002 		}   /* found correct program */
1003 	}
1004 
1005 	/*
1006 	 * if we got here, the program or version
1007 	 * is not served ...
1008 	 */
1009 	if (prog_found)
1010 		svcerr_progvers(rqstp, low_vers, high_vers);
1011 	else
1012 		svcerr_noprog(rqstp);
1013 
1014 	svc_freereq(rqstp);
1015 }
1016 
1017 static void
1018 svc_checkidle(SVCGROUP *grp)
1019 {
1020 	SVCXPRT *xprt, *nxprt;
1021 	time_t timo;
1022 	struct svcxprt_list cleanup;
1023 
1024 	TAILQ_INIT(&cleanup);
1025 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1026 		/*
1027 		 * Only some transports have idle timers. Don't time
1028 		 * something out which is just waking up.
1029 		 */
1030 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1031 			continue;
1032 
1033 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1034 		if (time_uptime > timo) {
1035 			xprt_unregister_locked(xprt);
1036 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1037 		}
1038 	}
1039 
1040 	mtx_unlock(&grp->sg_lock);
1041 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1042 		SVC_RELEASE(xprt);
1043 	}
1044 	mtx_lock(&grp->sg_lock);
1045 }
1046 
1047 static void
1048 svc_assign_waiting_sockets(SVCPOOL *pool)
1049 {
1050 	SVCGROUP *grp;
1051 	SVCXPRT *xprt;
1052 	int g;
1053 
1054 	for (g = 0; g < pool->sp_groupcount; g++) {
1055 		grp = &pool->sp_groups[g];
1056 		mtx_lock(&grp->sg_lock);
1057 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1058 			if (xprt_assignthread(xprt))
1059 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1060 			else
1061 				break;
1062 		}
1063 		mtx_unlock(&grp->sg_lock);
1064 	}
1065 }
1066 
1067 static void
1068 svc_change_space_used(SVCPOOL *pool, long delta)
1069 {
1070 	unsigned long value;
1071 
1072 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1073 	if (delta > 0) {
1074 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1075 			pool->sp_space_throttled = TRUE;
1076 			pool->sp_space_throttle_count++;
1077 		}
1078 		if (value > pool->sp_space_used_highest)
1079 			pool->sp_space_used_highest = value;
1080 	} else {
1081 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1082 			pool->sp_space_throttled = FALSE;
1083 			svc_assign_waiting_sockets(pool);
1084 		}
1085 	}
1086 }
1087 
1088 static bool_t
1089 svc_request_space_available(SVCPOOL *pool)
1090 {
1091 
1092 	if (pool->sp_space_throttled)
1093 		return (FALSE);
1094 	return (TRUE);
1095 }
1096 
1097 static void
1098 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1099 {
1100 	SVCPOOL *pool = grp->sg_pool;
1101 	SVCTHREAD *st, *stpref;
1102 	SVCXPRT *xprt;
1103 	enum xprt_stat stat;
1104 	struct svc_req *rqstp;
1105 	struct proc *p;
1106 	long sz;
1107 	int error;
1108 
1109 	st = mem_alloc(sizeof(*st));
1110 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1111 	st->st_pool = pool;
1112 	st->st_xprt = NULL;
1113 	STAILQ_INIT(&st->st_reqs);
1114 	cv_init(&st->st_cond, "rpcsvc");
1115 
1116 	mtx_lock(&grp->sg_lock);
1117 
1118 	/*
1119 	 * If we are a new thread which was spawned to cope with
1120 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1121 	 */
1122 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1123 		grp->sg_state = SVCPOOL_ACTIVE;
1124 
1125 	while (grp->sg_state != SVCPOOL_CLOSING) {
1126 		/*
1127 		 * Create new thread if requested.
1128 		 */
1129 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1130 			grp->sg_state = SVCPOOL_THREADSTARTING;
1131 			grp->sg_lastcreatetime = time_uptime;
1132 			mtx_unlock(&grp->sg_lock);
1133 			svc_new_thread(grp);
1134 			mtx_lock(&grp->sg_lock);
1135 			continue;
1136 		}
1137 
1138 		/*
1139 		 * Check for idle transports once per second.
1140 		 */
1141 		if (time_uptime > grp->sg_lastidlecheck) {
1142 			grp->sg_lastidlecheck = time_uptime;
1143 			svc_checkidle(grp);
1144 		}
1145 
1146 		xprt = st->st_xprt;
1147 		if (!xprt) {
1148 			/*
1149 			 * Enforce maxthreads count.
1150 			 */
1151 			if (grp->sg_threadcount > grp->sg_maxthreads)
1152 				break;
1153 
1154 			/*
1155 			 * Before sleeping, see if we can find an
1156 			 * active transport which isn't being serviced
1157 			 * by a thread.
1158 			 */
1159 			if (svc_request_space_available(pool) &&
1160 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1161 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1162 				SVC_ACQUIRE(xprt);
1163 				xprt->xp_thread = st;
1164 				st->st_xprt = xprt;
1165 				continue;
1166 			}
1167 
1168 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1169 			if (ismaster || (!ismaster &&
1170 			    grp->sg_threadcount > grp->sg_minthreads))
1171 				error = cv_timedwait_sig(&st->st_cond,
1172 				    &grp->sg_lock, 5 * hz);
1173 			else
1174 				error = cv_wait_sig(&st->st_cond,
1175 				    &grp->sg_lock);
1176 			if (st->st_xprt == NULL)
1177 				LIST_REMOVE(st, st_ilink);
1178 
1179 			/*
1180 			 * Reduce worker thread count when idle.
1181 			 */
1182 			if (error == EWOULDBLOCK) {
1183 				if (!ismaster
1184 				    && (grp->sg_threadcount
1185 					> grp->sg_minthreads)
1186 					&& !st->st_xprt)
1187 					break;
1188 			} else if (error != 0) {
1189 				KASSERT(error == EINTR || error == ERESTART,
1190 				    ("non-signal error %d", error));
1191 				mtx_unlock(&grp->sg_lock);
1192 				p = curproc;
1193 				PROC_LOCK(p);
1194 				if (P_SHOULDSTOP(p) ||
1195 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1196 					thread_suspend_check(0);
1197 					PROC_UNLOCK(p);
1198 					mtx_lock(&grp->sg_lock);
1199 				} else {
1200 					PROC_UNLOCK(p);
1201 					svc_exit(pool);
1202 					mtx_lock(&grp->sg_lock);
1203 					break;
1204 				}
1205 			}
1206 			continue;
1207 		}
1208 		mtx_unlock(&grp->sg_lock);
1209 
1210 		/*
1211 		 * Drain the transport socket and queue up any RPCs.
1212 		 */
1213 		xprt->xp_lastactive = time_uptime;
1214 		do {
1215 			if (!svc_request_space_available(pool))
1216 				break;
1217 			rqstp = NULL;
1218 			stat = svc_getreq(xprt, &rqstp);
1219 			if (rqstp) {
1220 				svc_change_space_used(pool, rqstp->rq_size);
1221 				/*
1222 				 * See if the application has a preference
1223 				 * for some other thread.
1224 				 */
1225 				if (pool->sp_assign) {
1226 					stpref = pool->sp_assign(st, rqstp);
1227 					rqstp->rq_thread = stpref;
1228 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1229 					    rqstp, rq_link);
1230 					mtx_unlock(&stpref->st_lock);
1231 					if (stpref != st)
1232 						rqstp = NULL;
1233 				} else {
1234 					rqstp->rq_thread = st;
1235 					STAILQ_INSERT_TAIL(&st->st_reqs,
1236 					    rqstp, rq_link);
1237 				}
1238 			}
1239 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1240 		    && grp->sg_state != SVCPOOL_CLOSING);
1241 
1242 		/*
1243 		 * Move this transport to the end of the active list to
1244 		 * ensure fairness when multiple transports are active.
1245 		 * If this was the last queued request, svc_getreq will end
1246 		 * up calling xprt_inactive to remove from the active list.
1247 		 */
1248 		mtx_lock(&grp->sg_lock);
1249 		xprt->xp_thread = NULL;
1250 		st->st_xprt = NULL;
1251 		if (xprt->xp_active) {
1252 			if (!svc_request_space_available(pool) ||
1253 			    !xprt_assignthread(xprt))
1254 				TAILQ_INSERT_TAIL(&grp->sg_active,
1255 				    xprt, xp_alink);
1256 		}
1257 		mtx_unlock(&grp->sg_lock);
1258 		SVC_RELEASE(xprt);
1259 
1260 		/*
1261 		 * Execute what we have queued.
1262 		 */
1263 		mtx_lock(&st->st_lock);
1264 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1265 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1266 			mtx_unlock(&st->st_lock);
1267 			sz = (long)rqstp->rq_size;
1268 			svc_executereq(rqstp);
1269 			svc_change_space_used(pool, -sz);
1270 			mtx_lock(&st->st_lock);
1271 		}
1272 		mtx_unlock(&st->st_lock);
1273 		mtx_lock(&grp->sg_lock);
1274 	}
1275 
1276 	if (st->st_xprt) {
1277 		xprt = st->st_xprt;
1278 		st->st_xprt = NULL;
1279 		SVC_RELEASE(xprt);
1280 	}
1281 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1282 	mtx_destroy(&st->st_lock);
1283 	cv_destroy(&st->st_cond);
1284 	mem_free(st, sizeof(*st));
1285 
1286 	grp->sg_threadcount--;
1287 	if (!ismaster)
1288 		wakeup(grp);
1289 	mtx_unlock(&grp->sg_lock);
1290 }
1291 
1292 static void
1293 svc_thread_start(void *arg)
1294 {
1295 
1296 	svc_run_internal((SVCGROUP *) arg, FALSE);
1297 	kthread_exit();
1298 }
1299 
1300 static void
1301 svc_new_thread(SVCGROUP *grp)
1302 {
1303 	SVCPOOL *pool = grp->sg_pool;
1304 	struct thread *td;
1305 
1306 	mtx_lock(&grp->sg_lock);
1307 	grp->sg_threadcount++;
1308 	mtx_unlock(&grp->sg_lock);
1309 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1310 	    "%s: service", pool->sp_name);
1311 }
1312 
1313 void
1314 svc_run(SVCPOOL *pool)
1315 {
1316 	int g, i;
1317 	struct proc *p;
1318 	struct thread *td;
1319 	SVCGROUP *grp;
1320 
1321 	p = curproc;
1322 	td = curthread;
1323 	snprintf(td->td_name, sizeof(td->td_name),
1324 	    "%s: master", pool->sp_name);
1325 	pool->sp_state = SVCPOOL_ACTIVE;
1326 	pool->sp_proc = p;
1327 
1328 	/* Choose group count based on number of threads and CPUs. */
1329 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1330 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1331 	for (g = 0; g < pool->sp_groupcount; g++) {
1332 		grp = &pool->sp_groups[g];
1333 		grp->sg_minthreads = max(1,
1334 		    pool->sp_minthreads / pool->sp_groupcount);
1335 		grp->sg_maxthreads = max(1,
1336 		    pool->sp_maxthreads / pool->sp_groupcount);
1337 		grp->sg_lastcreatetime = time_uptime;
1338 	}
1339 
1340 	/* Starting threads */
1341 	pool->sp_groups[0].sg_threadcount++;
1342 	for (g = 0; g < pool->sp_groupcount; g++) {
1343 		grp = &pool->sp_groups[g];
1344 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1345 			svc_new_thread(grp);
1346 	}
1347 	svc_run_internal(&pool->sp_groups[0], TRUE);
1348 
1349 	/* Waiting for threads to stop. */
1350 	for (g = 0; g < pool->sp_groupcount; g++) {
1351 		grp = &pool->sp_groups[g];
1352 		mtx_lock(&grp->sg_lock);
1353 		while (grp->sg_threadcount > 0)
1354 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1355 		mtx_unlock(&grp->sg_lock);
1356 	}
1357 }
1358 
1359 void
1360 svc_exit(SVCPOOL *pool)
1361 {
1362 	SVCGROUP *grp;
1363 	SVCTHREAD *st;
1364 	int g;
1365 
1366 	pool->sp_state = SVCPOOL_CLOSING;
1367 	for (g = 0; g < pool->sp_groupcount; g++) {
1368 		grp = &pool->sp_groups[g];
1369 		mtx_lock(&grp->sg_lock);
1370 		if (grp->sg_state != SVCPOOL_CLOSING) {
1371 			grp->sg_state = SVCPOOL_CLOSING;
1372 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1373 				cv_signal(&st->st_cond);
1374 		}
1375 		mtx_unlock(&grp->sg_lock);
1376 	}
1377 }
1378 
1379 bool_t
1380 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1381 {
1382 	struct mbuf *m;
1383 	XDR xdrs;
1384 	bool_t stat;
1385 
1386 	m = rqstp->rq_args;
1387 	rqstp->rq_args = NULL;
1388 
1389 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1390 	stat = xargs(&xdrs, args);
1391 	XDR_DESTROY(&xdrs);
1392 
1393 	return (stat);
1394 }
1395 
1396 bool_t
1397 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1398 {
1399 	XDR xdrs;
1400 
1401 	if (rqstp->rq_addr) {
1402 		free(rqstp->rq_addr, M_SONAME);
1403 		rqstp->rq_addr = NULL;
1404 	}
1405 
1406 	xdrs.x_op = XDR_FREE;
1407 	return (xargs(&xdrs, args));
1408 }
1409 
1410 void
1411 svc_freereq(struct svc_req *rqstp)
1412 {
1413 	SVCTHREAD *st;
1414 	SVCPOOL *pool;
1415 
1416 	st = rqstp->rq_thread;
1417 	if (st) {
1418 		pool = st->st_pool;
1419 		if (pool->sp_done)
1420 			pool->sp_done(st, rqstp);
1421 	}
1422 
1423 	if (rqstp->rq_auth.svc_ah_ops)
1424 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1425 
1426 	if (rqstp->rq_xprt) {
1427 		SVC_RELEASE(rqstp->rq_xprt);
1428 	}
1429 
1430 	if (rqstp->rq_addr)
1431 		free(rqstp->rq_addr, M_SONAME);
1432 
1433 	if (rqstp->rq_args)
1434 		m_freem(rqstp->rq_args);
1435 
1436 	free(rqstp, M_RPC);
1437 }
1438