xref: /freebsd/sys/rpc/svc.c (revision e64fe029e9d3ce476e77a478318e0c3cd201ff08)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice,
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice,
14  *   this list of conditions and the following disclaimer in the documentation
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its
17  *   contributors may be used to endorse or promote products derived
18  *   from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39 
40 /*
41  * svc.c, Server-side remote procedure call interface.
42  *
43  * There are two sets of procedures here.  The xprt routines are
44  * for handling transport handles.  The svc routines handle the
45  * list of service routines.
46  *
47  * Copyright (C) 1984, Sun Microsystems, Inc.
48  */
49 
50 #include <sys/param.h>
51 #include <sys/jail.h>
52 #include <sys/lock.h>
53 #include <sys/kernel.h>
54 #include <sys/kthread.h>
55 #include <sys/malloc.h>
56 #include <sys/mbuf.h>
57 #include <sys/mutex.h>
58 #include <sys/proc.h>
59 #include <sys/queue.h>
60 #include <sys/socketvar.h>
61 #include <sys/systm.h>
62 #include <sys/smp.h>
63 #include <sys/sx.h>
64 #include <sys/ucred.h>
65 
66 #include <rpc/rpc.h>
67 #include <rpc/rpcb_clnt.h>
68 #include <rpc/replay.h>
69 
70 #include <rpc/rpc_com.h>
71 
72 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
73 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
74 
75 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
76     char *);
77 static void svc_new_thread(SVCGROUP *grp);
78 static void xprt_unregister_locked(SVCXPRT *xprt);
79 static void svc_change_space_used(SVCPOOL *pool, long delta);
80 static bool_t svc_request_space_available(SVCPOOL *pool);
81 static void svcpool_cleanup(SVCPOOL *pool);
82 
83 /* ***************  SVCXPRT related stuff **************** */
84 
85 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
87 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
88 
89 SVCPOOL*
90 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
91 {
92 	SVCPOOL *pool;
93 	SVCGROUP *grp;
94 	int g;
95 
96 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
97 
98 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
99 	pool->sp_name = name;
100 	pool->sp_state = SVCPOOL_INIT;
101 	pool->sp_proc = NULL;
102 	TAILQ_INIT(&pool->sp_callouts);
103 	TAILQ_INIT(&pool->sp_lcallouts);
104 	pool->sp_minthreads = 1;
105 	pool->sp_maxthreads = 1;
106 	pool->sp_groupcount = 1;
107 	for (g = 0; g < SVC_MAXGROUPS; g++) {
108 		grp = &pool->sp_groups[g];
109 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
110 		grp->sg_pool = pool;
111 		grp->sg_state = SVCPOOL_ACTIVE;
112 		TAILQ_INIT(&grp->sg_xlist);
113 		TAILQ_INIT(&grp->sg_active);
114 		LIST_INIT(&grp->sg_idlethreads);
115 		grp->sg_minthreads = 1;
116 		grp->sg_maxthreads = 1;
117 	}
118 
119 	/*
120 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
121 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
122 	 * on LP64 architectures, so cast to u_long to avoid undefined
123 	 * behavior.  (ILP32 architectures cannot have nmbclusters
124 	 * large enough to overflow for other reasons.)
125 	 */
126 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
127 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
128 
129 	sysctl_ctx_init(&pool->sp_sysctl);
130 	if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
131 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
133 		    pool, 0, svcpool_minthread_sysctl, "I",
134 		    "Minimal number of threads");
135 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
137 		    pool, 0, svcpool_maxthread_sysctl, "I",
138 		    "Maximal number of threads");
139 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
141 		    pool, 0, svcpool_threads_sysctl, "I",
142 		    "Current number of threads");
143 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
145 		    "Number of thread groups");
146 
147 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
148 		    "request_space_used", CTLFLAG_RD,
149 		    &pool->sp_space_used,
150 		    "Space in parsed but not handled requests.");
151 
152 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
153 		    "request_space_used_highest", CTLFLAG_RD,
154 		    &pool->sp_space_used_highest,
155 		    "Highest space used since reboot.");
156 
157 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
158 		    "request_space_high", CTLFLAG_RW,
159 		    &pool->sp_space_high,
160 		    "Maximum space in parsed but not handled requests.");
161 
162 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
163 		    "request_space_low", CTLFLAG_RW,
164 		    &pool->sp_space_low,
165 		    "Low water mark for request space.");
166 
167 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
168 		    "request_space_throttled", CTLFLAG_RD,
169 		    &pool->sp_space_throttled, 0,
170 		    "Whether nfs requests are currently throttled");
171 
172 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
173 		    "request_space_throttle_count", CTLFLAG_RD,
174 		    &pool->sp_space_throttle_count, 0,
175 		    "Count of times throttling based on request space has occurred");
176 	}
177 
178 	return pool;
179 }
180 
181 /*
182  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
183  * the pool data structures.
184  */
185 static void
186 svcpool_cleanup(SVCPOOL *pool)
187 {
188 	SVCGROUP *grp;
189 	SVCXPRT *xprt, *nxprt;
190 	struct svc_callout *s;
191 	struct svc_loss_callout *sl;
192 	struct svcxprt_list cleanup;
193 	int g;
194 
195 	TAILQ_INIT(&cleanup);
196 
197 	for (g = 0; g < SVC_MAXGROUPS; g++) {
198 		grp = &pool->sp_groups[g];
199 		mtx_lock(&grp->sg_lock);
200 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
201 			xprt_unregister_locked(xprt);
202 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
203 		}
204 		mtx_unlock(&grp->sg_lock);
205 	}
206 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
207 		if (xprt->xp_socket != NULL)
208 			soshutdown(xprt->xp_socket, SHUT_WR);
209 		SVC_RELEASE(xprt);
210 	}
211 
212 	mtx_lock(&pool->sp_lock);
213 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
214 		mtx_unlock(&pool->sp_lock);
215 		svc_unreg(pool, s->sc_prog, s->sc_vers);
216 		mtx_lock(&pool->sp_lock);
217 	}
218 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
219 		mtx_unlock(&pool->sp_lock);
220 		svc_loss_unreg(pool, sl->slc_dispatch);
221 		mtx_lock(&pool->sp_lock);
222 	}
223 	mtx_unlock(&pool->sp_lock);
224 }
225 
226 void
227 svcpool_destroy(SVCPOOL *pool)
228 {
229 	SVCGROUP *grp;
230 	int g;
231 
232 	svcpool_cleanup(pool);
233 
234 	for (g = 0; g < SVC_MAXGROUPS; g++) {
235 		grp = &pool->sp_groups[g];
236 		mtx_destroy(&grp->sg_lock);
237 	}
238 	mtx_destroy(&pool->sp_lock);
239 
240 	if (pool->sp_rcache)
241 		replay_freecache(pool->sp_rcache);
242 
243 	sysctl_ctx_free(&pool->sp_sysctl);
244 	free(pool, M_RPC);
245 }
246 
247 /*
248  * Similar to svcpool_destroy(), except that it does not destroy the actual
249  * data structures.  As such, "pool" may be used again.
250  */
251 void
252 svcpool_close(SVCPOOL *pool)
253 {
254 	SVCGROUP *grp;
255 	int g;
256 
257 	svcpool_cleanup(pool);
258 
259 	/* Now, initialize the pool's state for a fresh svc_run() call. */
260 	mtx_lock(&pool->sp_lock);
261 	pool->sp_state = SVCPOOL_INIT;
262 	mtx_unlock(&pool->sp_lock);
263 	for (g = 0; g < SVC_MAXGROUPS; g++) {
264 		grp = &pool->sp_groups[g];
265 		mtx_lock(&grp->sg_lock);
266 		grp->sg_state = SVCPOOL_ACTIVE;
267 		mtx_unlock(&grp->sg_lock);
268 	}
269 }
270 
271 /*
272  * Sysctl handler to get the present thread count on a pool
273  */
274 static int
275 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
276 {
277 	SVCPOOL *pool;
278 	int threads, error, g;
279 
280 	pool = oidp->oid_arg1;
281 	threads = 0;
282 	mtx_lock(&pool->sp_lock);
283 	for (g = 0; g < pool->sp_groupcount; g++)
284 		threads += pool->sp_groups[g].sg_threadcount;
285 	mtx_unlock(&pool->sp_lock);
286 	error = sysctl_handle_int(oidp, &threads, 0, req);
287 	return (error);
288 }
289 
290 /*
291  * Sysctl handler to set the minimum thread count on a pool
292  */
293 static int
294 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
295 {
296 	SVCPOOL *pool;
297 	int newminthreads, error, g;
298 
299 	pool = oidp->oid_arg1;
300 	newminthreads = pool->sp_minthreads;
301 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
302 	if (error == 0 && newminthreads != pool->sp_minthreads) {
303 		if (newminthreads > pool->sp_maxthreads)
304 			return (EINVAL);
305 		mtx_lock(&pool->sp_lock);
306 		pool->sp_minthreads = newminthreads;
307 		for (g = 0; g < pool->sp_groupcount; g++) {
308 			pool->sp_groups[g].sg_minthreads = max(1,
309 			    pool->sp_minthreads / pool->sp_groupcount);
310 		}
311 		mtx_unlock(&pool->sp_lock);
312 	}
313 	return (error);
314 }
315 
316 /*
317  * Sysctl handler to set the maximum thread count on a pool
318  */
319 static int
320 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
321 {
322 	SVCPOOL *pool;
323 	int newmaxthreads, error, g;
324 
325 	pool = oidp->oid_arg1;
326 	newmaxthreads = pool->sp_maxthreads;
327 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
328 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
329 		if (newmaxthreads < pool->sp_minthreads)
330 			return (EINVAL);
331 		mtx_lock(&pool->sp_lock);
332 		pool->sp_maxthreads = newmaxthreads;
333 		for (g = 0; g < pool->sp_groupcount; g++) {
334 			pool->sp_groups[g].sg_maxthreads = max(1,
335 			    pool->sp_maxthreads / pool->sp_groupcount);
336 		}
337 		mtx_unlock(&pool->sp_lock);
338 	}
339 	return (error);
340 }
341 
342 /*
343  * Activate a transport handle.
344  */
345 void
346 xprt_register(SVCXPRT *xprt)
347 {
348 	SVCPOOL *pool = xprt->xp_pool;
349 	SVCGROUP *grp;
350 	int g;
351 
352 	SVC_ACQUIRE(xprt);
353 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
354 	xprt->xp_group = grp = &pool->sp_groups[g];
355 	mtx_lock(&grp->sg_lock);
356 	xprt->xp_registered = TRUE;
357 	xprt->xp_active = FALSE;
358 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
359 	mtx_unlock(&grp->sg_lock);
360 }
361 
362 /*
363  * De-activate a transport handle. Note: the locked version doesn't
364  * release the transport - caller must do that after dropping the pool
365  * lock.
366  */
367 static void
368 xprt_unregister_locked(SVCXPRT *xprt)
369 {
370 	SVCGROUP *grp = xprt->xp_group;
371 
372 	mtx_assert(&grp->sg_lock, MA_OWNED);
373 	KASSERT(xprt->xp_registered == TRUE,
374 	    ("xprt_unregister_locked: not registered"));
375 	xprt_inactive_locked(xprt);
376 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
377 	xprt->xp_registered = FALSE;
378 }
379 
380 void
381 xprt_unregister(SVCXPRT *xprt)
382 {
383 	SVCGROUP *grp = xprt->xp_group;
384 
385 	mtx_lock(&grp->sg_lock);
386 	if (xprt->xp_registered == FALSE) {
387 		/* Already unregistered by another thread */
388 		mtx_unlock(&grp->sg_lock);
389 		return;
390 	}
391 	xprt_unregister_locked(xprt);
392 	mtx_unlock(&grp->sg_lock);
393 
394 	if (xprt->xp_socket != NULL)
395 		soshutdown(xprt->xp_socket, SHUT_WR);
396 	SVC_RELEASE(xprt);
397 }
398 
399 /*
400  * Attempt to assign a service thread to this transport.
401  */
402 static int
403 xprt_assignthread(SVCXPRT *xprt)
404 {
405 	SVCGROUP *grp = xprt->xp_group;
406 	SVCTHREAD *st;
407 
408 	mtx_assert(&grp->sg_lock, MA_OWNED);
409 	st = LIST_FIRST(&grp->sg_idlethreads);
410 	if (st) {
411 		LIST_REMOVE(st, st_ilink);
412 		SVC_ACQUIRE(xprt);
413 		xprt->xp_thread = st;
414 		st->st_xprt = xprt;
415 		cv_signal(&st->st_cond);
416 		return (TRUE);
417 	} else {
418 		/*
419 		 * See if we can create a new thread. The
420 		 * actual thread creation happens in
421 		 * svc_run_internal because our locking state
422 		 * is poorly defined (we are typically called
423 		 * from a socket upcall). Don't create more
424 		 * than one thread per second.
425 		 */
426 		if (grp->sg_state == SVCPOOL_ACTIVE
427 		    && grp->sg_lastcreatetime < time_uptime
428 		    && grp->sg_threadcount < grp->sg_maxthreads) {
429 			grp->sg_state = SVCPOOL_THREADWANTED;
430 		}
431 	}
432 	return (FALSE);
433 }
434 
435 void
436 xprt_active(SVCXPRT *xprt)
437 {
438 	SVCGROUP *grp = xprt->xp_group;
439 
440 	mtx_lock(&grp->sg_lock);
441 
442 	if (!xprt->xp_registered) {
443 		/*
444 		 * Race with xprt_unregister - we lose.
445 		 */
446 		mtx_unlock(&grp->sg_lock);
447 		return;
448 	}
449 
450 	if (!xprt->xp_active) {
451 		xprt->xp_active = TRUE;
452 		if (xprt->xp_thread == NULL) {
453 			if (!svc_request_space_available(xprt->xp_pool) ||
454 			    !xprt_assignthread(xprt))
455 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
456 				    xp_alink);
457 		}
458 	}
459 
460 	mtx_unlock(&grp->sg_lock);
461 }
462 
463 void
464 xprt_inactive_locked(SVCXPRT *xprt)
465 {
466 	SVCGROUP *grp = xprt->xp_group;
467 
468 	mtx_assert(&grp->sg_lock, MA_OWNED);
469 	if (xprt->xp_active) {
470 		if (xprt->xp_thread == NULL)
471 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
472 		xprt->xp_active = FALSE;
473 	}
474 }
475 
476 void
477 xprt_inactive(SVCXPRT *xprt)
478 {
479 	SVCGROUP *grp = xprt->xp_group;
480 
481 	mtx_lock(&grp->sg_lock);
482 	xprt_inactive_locked(xprt);
483 	mtx_unlock(&grp->sg_lock);
484 }
485 
486 /*
487  * Variant of xprt_inactive() for use only when sure that port is
488  * assigned to thread. For example, within receive handlers.
489  */
490 void
491 xprt_inactive_self(SVCXPRT *xprt)
492 {
493 
494 	KASSERT(xprt->xp_thread != NULL,
495 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
496 	xprt->xp_active = FALSE;
497 }
498 
499 /*
500  * Add a service program to the callout list.
501  * The dispatch routine will be called when a rpc request for this
502  * program number comes in.
503  */
504 bool_t
505 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
506     void (*dispatch)(struct svc_req *, SVCXPRT *),
507     const struct netconfig *nconf)
508 {
509 	SVCPOOL *pool = xprt->xp_pool;
510 	struct svc_callout *s;
511 	char *netid = NULL;
512 	int flag = 0;
513 
514 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
515 
516 	if (xprt->xp_netid) {
517 		netid = strdup(xprt->xp_netid, M_RPC);
518 		flag = 1;
519 	} else if (nconf && nconf->nc_netid) {
520 		netid = strdup(nconf->nc_netid, M_RPC);
521 		flag = 1;
522 	} /* must have been created with svc_raw_create */
523 	if ((netid == NULL) && (flag == 1)) {
524 		return (FALSE);
525 	}
526 
527 	mtx_lock(&pool->sp_lock);
528 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
529 		if (netid)
530 			free(netid, M_RPC);
531 		if (s->sc_dispatch == dispatch)
532 			goto rpcb_it; /* he is registering another xptr */
533 		mtx_unlock(&pool->sp_lock);
534 		return (FALSE);
535 	}
536 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
537 	if (s == NULL) {
538 		if (netid)
539 			free(netid, M_RPC);
540 		mtx_unlock(&pool->sp_lock);
541 		return (FALSE);
542 	}
543 
544 	s->sc_prog = prog;
545 	s->sc_vers = vers;
546 	s->sc_dispatch = dispatch;
547 	s->sc_netid = netid;
548 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
549 
550 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
551 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
552 
553 rpcb_it:
554 	mtx_unlock(&pool->sp_lock);
555 	/* now register the information with the local binder service */
556 	if (nconf) {
557 		bool_t dummy;
558 		struct netconfig tnc;
559 		struct netbuf nb;
560 		tnc = *nconf;
561 		nb.buf = &xprt->xp_ltaddr;
562 		nb.len = xprt->xp_ltaddr.ss_len;
563 		dummy = rpcb_set(prog, vers, &tnc, &nb);
564 		return (dummy);
565 	}
566 	return (TRUE);
567 }
568 
569 /*
570  * Remove a service program from the callout list.
571  */
572 void
573 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
574 {
575 	struct svc_callout *s;
576 
577 	/* unregister the information anyway */
578 	(void) rpcb_unset(prog, vers, NULL);
579 	mtx_lock(&pool->sp_lock);
580 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
581 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
582 		if (s->sc_netid)
583 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
584 		mem_free(s, sizeof (struct svc_callout));
585 	}
586 	mtx_unlock(&pool->sp_lock);
587 }
588 
589 /*
590  * Add a service connection loss program to the callout list.
591  * The dispatch routine will be called when some port in ths pool die.
592  */
593 bool_t
594 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
595 {
596 	SVCPOOL *pool = xprt->xp_pool;
597 	struct svc_loss_callout *s;
598 
599 	mtx_lock(&pool->sp_lock);
600 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
601 		if (s->slc_dispatch == dispatch)
602 			break;
603 	}
604 	if (s != NULL) {
605 		mtx_unlock(&pool->sp_lock);
606 		return (TRUE);
607 	}
608 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
609 	if (s == NULL) {
610 		mtx_unlock(&pool->sp_lock);
611 		return (FALSE);
612 	}
613 	s->slc_dispatch = dispatch;
614 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
615 	mtx_unlock(&pool->sp_lock);
616 	return (TRUE);
617 }
618 
619 /*
620  * Remove a service connection loss program from the callout list.
621  */
622 void
623 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
624 {
625 	struct svc_loss_callout *s;
626 
627 	mtx_lock(&pool->sp_lock);
628 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
629 		if (s->slc_dispatch == dispatch) {
630 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
631 			free(s, M_RPC);
632 			break;
633 		}
634 	}
635 	mtx_unlock(&pool->sp_lock);
636 }
637 
638 /* ********************** CALLOUT list related stuff ************* */
639 
640 /*
641  * Search the callout list for a program number, return the callout
642  * struct.
643  */
644 static struct svc_callout *
645 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
646 {
647 	struct svc_callout *s;
648 
649 	mtx_assert(&pool->sp_lock, MA_OWNED);
650 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
651 		if (s->sc_prog == prog && s->sc_vers == vers
652 		    && (netid == NULL || s->sc_netid == NULL ||
653 			strcmp(netid, s->sc_netid) == 0))
654 			break;
655 	}
656 
657 	return (s);
658 }
659 
660 /* ******************* REPLY GENERATION ROUTINES  ************ */
661 
662 static bool_t
663 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
664     struct mbuf *body)
665 {
666 	SVCXPRT *xprt = rqstp->rq_xprt;
667 	bool_t ok;
668 
669 	if (rqstp->rq_args) {
670 		m_freem(rqstp->rq_args);
671 		rqstp->rq_args = NULL;
672 	}
673 
674 	if (xprt->xp_pool->sp_rcache)
675 		replay_setreply(xprt->xp_pool->sp_rcache,
676 		    rply, svc_getrpccaller(rqstp), body);
677 
678 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
679 		return (FALSE);
680 
681 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
682 	if (rqstp->rq_addr) {
683 		free(rqstp->rq_addr, M_SONAME);
684 		rqstp->rq_addr = NULL;
685 	}
686 
687 	return (ok);
688 }
689 
690 /*
691  * Send a reply to an rpc request
692  */
693 bool_t
694 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
695 {
696 	struct rpc_msg rply;
697 	struct mbuf *m;
698 	XDR xdrs;
699 	bool_t ok;
700 
701 	rply.rm_xid = rqstp->rq_xid;
702 	rply.rm_direction = REPLY;
703 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
704 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
705 	rply.acpted_rply.ar_stat = SUCCESS;
706 	rply.acpted_rply.ar_results.where = NULL;
707 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
708 
709 	m = m_getcl(M_WAITOK, MT_DATA, 0);
710 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
711 	ok = xdr_results(&xdrs, xdr_location);
712 	XDR_DESTROY(&xdrs);
713 
714 	if (ok) {
715 		return (svc_sendreply_common(rqstp, &rply, m));
716 	} else {
717 		m_freem(m);
718 		return (FALSE);
719 	}
720 }
721 
722 bool_t
723 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
724 {
725 	struct rpc_msg rply;
726 
727 	rply.rm_xid = rqstp->rq_xid;
728 	rply.rm_direction = REPLY;
729 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
730 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
731 	rply.acpted_rply.ar_stat = SUCCESS;
732 	rply.acpted_rply.ar_results.where = NULL;
733 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
734 
735 	return (svc_sendreply_common(rqstp, &rply, m));
736 }
737 
738 /*
739  * No procedure error reply
740  */
741 void
742 svcerr_noproc(struct svc_req *rqstp)
743 {
744 	SVCXPRT *xprt = rqstp->rq_xprt;
745 	struct rpc_msg rply;
746 
747 	rply.rm_xid = rqstp->rq_xid;
748 	rply.rm_direction = REPLY;
749 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
750 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
751 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
752 
753 	if (xprt->xp_pool->sp_rcache)
754 		replay_setreply(xprt->xp_pool->sp_rcache,
755 		    &rply, svc_getrpccaller(rqstp), NULL);
756 
757 	svc_sendreply_common(rqstp, &rply, NULL);
758 }
759 
760 /*
761  * Can't decode args error reply
762  */
763 void
764 svcerr_decode(struct svc_req *rqstp)
765 {
766 	SVCXPRT *xprt = rqstp->rq_xprt;
767 	struct rpc_msg rply;
768 
769 	rply.rm_xid = rqstp->rq_xid;
770 	rply.rm_direction = REPLY;
771 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
772 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
773 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
774 
775 	if (xprt->xp_pool->sp_rcache)
776 		replay_setreply(xprt->xp_pool->sp_rcache,
777 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
778 
779 	svc_sendreply_common(rqstp, &rply, NULL);
780 }
781 
782 /*
783  * Some system error
784  */
785 void
786 svcerr_systemerr(struct svc_req *rqstp)
787 {
788 	SVCXPRT *xprt = rqstp->rq_xprt;
789 	struct rpc_msg rply;
790 
791 	rply.rm_xid = rqstp->rq_xid;
792 	rply.rm_direction = REPLY;
793 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
794 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
795 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
796 
797 	if (xprt->xp_pool->sp_rcache)
798 		replay_setreply(xprt->xp_pool->sp_rcache,
799 		    &rply, svc_getrpccaller(rqstp), NULL);
800 
801 	svc_sendreply_common(rqstp, &rply, NULL);
802 }
803 
804 /*
805  * Authentication error reply
806  */
807 void
808 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
809 {
810 	SVCXPRT *xprt = rqstp->rq_xprt;
811 	struct rpc_msg rply;
812 
813 	rply.rm_xid = rqstp->rq_xid;
814 	rply.rm_direction = REPLY;
815 	rply.rm_reply.rp_stat = MSG_DENIED;
816 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
817 	rply.rjcted_rply.rj_why = why;
818 
819 	if (xprt->xp_pool->sp_rcache)
820 		replay_setreply(xprt->xp_pool->sp_rcache,
821 		    &rply, svc_getrpccaller(rqstp), NULL);
822 
823 	svc_sendreply_common(rqstp, &rply, NULL);
824 }
825 
826 /*
827  * Auth too weak error reply
828  */
829 void
830 svcerr_weakauth(struct svc_req *rqstp)
831 {
832 
833 	svcerr_auth(rqstp, AUTH_TOOWEAK);
834 }
835 
836 /*
837  * Program unavailable error reply
838  */
839 void
840 svcerr_noprog(struct svc_req *rqstp)
841 {
842 	SVCXPRT *xprt = rqstp->rq_xprt;
843 	struct rpc_msg rply;
844 
845 	rply.rm_xid = rqstp->rq_xid;
846 	rply.rm_direction = REPLY;
847 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
848 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
849 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
850 
851 	if (xprt->xp_pool->sp_rcache)
852 		replay_setreply(xprt->xp_pool->sp_rcache,
853 		    &rply, svc_getrpccaller(rqstp), NULL);
854 
855 	svc_sendreply_common(rqstp, &rply, NULL);
856 }
857 
858 /*
859  * Program version mismatch error reply
860  */
861 void
862 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
863 {
864 	SVCXPRT *xprt = rqstp->rq_xprt;
865 	struct rpc_msg rply;
866 
867 	rply.rm_xid = rqstp->rq_xid;
868 	rply.rm_direction = REPLY;
869 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
870 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
871 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
872 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
873 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
874 
875 	if (xprt->xp_pool->sp_rcache)
876 		replay_setreply(xprt->xp_pool->sp_rcache,
877 		    &rply, svc_getrpccaller(rqstp), NULL);
878 
879 	svc_sendreply_common(rqstp, &rply, NULL);
880 }
881 
882 /*
883  * Allocate a new server transport structure. All fields are
884  * initialized to zero and xp_p3 is initialized to point at an
885  * extension structure to hold various flags and authentication
886  * parameters.
887  */
888 SVCXPRT *
889 svc_xprt_alloc(void)
890 {
891 	SVCXPRT *xprt;
892 	SVCXPRT_EXT *ext;
893 
894 	xprt = mem_alloc(sizeof(SVCXPRT));
895 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
896 	xprt->xp_p3 = ext;
897 	refcount_init(&xprt->xp_refs, 1);
898 
899 	return (xprt);
900 }
901 
902 /*
903  * Free a server transport structure.
904  */
905 void
906 svc_xprt_free(SVCXPRT *xprt)
907 {
908 
909 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
910 	/* The size argument is ignored, so 0 is ok. */
911 	mem_free(xprt->xp_gidp, 0);
912 	mem_free(xprt, sizeof(SVCXPRT));
913 }
914 
915 /* ******************* SERVER INPUT STUFF ******************* */
916 
917 /*
918  * Read RPC requests from a transport and queue them to be
919  * executed. We handle authentication and replay cache replies here.
920  * Actually dispatching the RPC is deferred till svc_executereq.
921  */
922 static enum xprt_stat
923 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
924 {
925 	SVCPOOL *pool = xprt->xp_pool;
926 	struct svc_req *r;
927 	struct rpc_msg msg;
928 	struct mbuf *args;
929 	struct svc_loss_callout *s;
930 	enum xprt_stat stat;
931 
932 	/* now receive msgs from xprtprt (support batch calls) */
933 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
934 
935 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
936 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
937 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
938 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
939 		enum auth_stat why;
940 
941 		/*
942 		 * Handle replays and authenticate before queuing the
943 		 * request to be executed.
944 		 */
945 		SVC_ACQUIRE(xprt);
946 		r->rq_xprt = xprt;
947 		if (pool->sp_rcache) {
948 			struct rpc_msg repmsg;
949 			struct mbuf *repbody;
950 			enum replay_state rs;
951 			rs = replay_find(pool->sp_rcache, &msg,
952 			    svc_getrpccaller(r), &repmsg, &repbody);
953 			switch (rs) {
954 			case RS_NEW:
955 				break;
956 			case RS_DONE:
957 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
958 				    repbody, &r->rq_reply_seq);
959 				if (r->rq_addr) {
960 					free(r->rq_addr, M_SONAME);
961 					r->rq_addr = NULL;
962 				}
963 				m_freem(args);
964 				goto call_done;
965 
966 			default:
967 				m_freem(args);
968 				goto call_done;
969 			}
970 		}
971 
972 		r->rq_xid = msg.rm_xid;
973 		r->rq_prog = msg.rm_call.cb_prog;
974 		r->rq_vers = msg.rm_call.cb_vers;
975 		r->rq_proc = msg.rm_call.cb_proc;
976 		r->rq_size = sizeof(*r) + m_length(args, NULL);
977 		r->rq_args = args;
978 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
979 			/*
980 			 * RPCSEC_GSS uses this return code
981 			 * for requests that form part of its
982 			 * context establishment protocol and
983 			 * should not be dispatched to the
984 			 * application.
985 			 */
986 			if (why != RPCSEC_GSS_NODISPATCH)
987 				svcerr_auth(r, why);
988 			goto call_done;
989 		}
990 
991 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
992 			svcerr_decode(r);
993 			goto call_done;
994 		}
995 
996 		/*
997 		 * Everything checks out, return request to caller.
998 		 */
999 		*rqstp_ret = r;
1000 		r = NULL;
1001 	}
1002 call_done:
1003 	if (r) {
1004 		svc_freereq(r);
1005 		r = NULL;
1006 	}
1007 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1008 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1009 			(*s->slc_dispatch)(xprt);
1010 		xprt_unregister(xprt);
1011 	}
1012 
1013 	return (stat);
1014 }
1015 
1016 static void
1017 svc_executereq(struct svc_req *rqstp)
1018 {
1019 	SVCXPRT *xprt = rqstp->rq_xprt;
1020 	SVCPOOL *pool = xprt->xp_pool;
1021 	int prog_found;
1022 	rpcvers_t low_vers;
1023 	rpcvers_t high_vers;
1024 	struct svc_callout *s;
1025 
1026 	/* now match message with a registered service*/
1027 	prog_found = FALSE;
1028 	low_vers = (rpcvers_t) -1L;
1029 	high_vers = (rpcvers_t) 0L;
1030 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1031 		if (s->sc_prog == rqstp->rq_prog) {
1032 			if (s->sc_vers == rqstp->rq_vers) {
1033 				/*
1034 				 * We hand ownership of r to the
1035 				 * dispatch method - they must call
1036 				 * svc_freereq.
1037 				 */
1038 				(*s->sc_dispatch)(rqstp, xprt);
1039 				return;
1040 			}  /* found correct version */
1041 			prog_found = TRUE;
1042 			if (s->sc_vers < low_vers)
1043 				low_vers = s->sc_vers;
1044 			if (s->sc_vers > high_vers)
1045 				high_vers = s->sc_vers;
1046 		}   /* found correct program */
1047 	}
1048 
1049 	/*
1050 	 * if we got here, the program or version
1051 	 * is not served ...
1052 	 */
1053 	if (prog_found)
1054 		svcerr_progvers(rqstp, low_vers, high_vers);
1055 	else
1056 		svcerr_noprog(rqstp);
1057 
1058 	svc_freereq(rqstp);
1059 }
1060 
1061 static void
1062 svc_checkidle(SVCGROUP *grp)
1063 {
1064 	SVCXPRT *xprt, *nxprt;
1065 	time_t timo;
1066 	struct svcxprt_list cleanup;
1067 
1068 	TAILQ_INIT(&cleanup);
1069 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1070 		/*
1071 		 * Only some transports have idle timers. Don't time
1072 		 * something out which is just waking up.
1073 		 */
1074 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1075 			continue;
1076 
1077 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1078 		if (time_uptime > timo) {
1079 			xprt_unregister_locked(xprt);
1080 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1081 		}
1082 	}
1083 
1084 	mtx_unlock(&grp->sg_lock);
1085 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1086 		soshutdown(xprt->xp_socket, SHUT_WR);
1087 		SVC_RELEASE(xprt);
1088 	}
1089 	mtx_lock(&grp->sg_lock);
1090 }
1091 
1092 static void
1093 svc_assign_waiting_sockets(SVCPOOL *pool)
1094 {
1095 	SVCGROUP *grp;
1096 	SVCXPRT *xprt;
1097 	int g;
1098 
1099 	for (g = 0; g < pool->sp_groupcount; g++) {
1100 		grp = &pool->sp_groups[g];
1101 		mtx_lock(&grp->sg_lock);
1102 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1103 			if (xprt_assignthread(xprt))
1104 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1105 			else
1106 				break;
1107 		}
1108 		mtx_unlock(&grp->sg_lock);
1109 	}
1110 }
1111 
1112 static void
1113 svc_change_space_used(SVCPOOL *pool, long delta)
1114 {
1115 	unsigned long value;
1116 
1117 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1118 	if (delta > 0) {
1119 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1120 			pool->sp_space_throttled = TRUE;
1121 			pool->sp_space_throttle_count++;
1122 		}
1123 		if (value > pool->sp_space_used_highest)
1124 			pool->sp_space_used_highest = value;
1125 	} else {
1126 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1127 			pool->sp_space_throttled = FALSE;
1128 			svc_assign_waiting_sockets(pool);
1129 		}
1130 	}
1131 }
1132 
1133 static bool_t
1134 svc_request_space_available(SVCPOOL *pool)
1135 {
1136 
1137 	if (pool->sp_space_throttled)
1138 		return (FALSE);
1139 	return (TRUE);
1140 }
1141 
1142 static void
1143 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1144 {
1145 	SVCPOOL *pool = grp->sg_pool;
1146 	SVCTHREAD *st, *stpref;
1147 	SVCXPRT *xprt;
1148 	enum xprt_stat stat;
1149 	struct svc_req *rqstp;
1150 	struct proc *p;
1151 	long sz;
1152 	int error;
1153 
1154 	st = mem_alloc(sizeof(*st));
1155 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1156 	st->st_pool = pool;
1157 	st->st_xprt = NULL;
1158 	STAILQ_INIT(&st->st_reqs);
1159 	cv_init(&st->st_cond, "rpcsvc");
1160 
1161 	mtx_lock(&grp->sg_lock);
1162 
1163 	/*
1164 	 * If we are a new thread which was spawned to cope with
1165 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1166 	 */
1167 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1168 		grp->sg_state = SVCPOOL_ACTIVE;
1169 
1170 	while (grp->sg_state != SVCPOOL_CLOSING) {
1171 		/*
1172 		 * Create new thread if requested.
1173 		 */
1174 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1175 			grp->sg_state = SVCPOOL_THREADSTARTING;
1176 			grp->sg_lastcreatetime = time_uptime;
1177 			mtx_unlock(&grp->sg_lock);
1178 			svc_new_thread(grp);
1179 			mtx_lock(&grp->sg_lock);
1180 			continue;
1181 		}
1182 
1183 		/*
1184 		 * Check for idle transports once per second.
1185 		 */
1186 		if (time_uptime > grp->sg_lastidlecheck) {
1187 			grp->sg_lastidlecheck = time_uptime;
1188 			svc_checkidle(grp);
1189 		}
1190 
1191 		xprt = st->st_xprt;
1192 		if (!xprt) {
1193 			/*
1194 			 * Enforce maxthreads count.
1195 			 */
1196 			if (!ismaster && grp->sg_threadcount >
1197 			    grp->sg_maxthreads)
1198 				break;
1199 
1200 			/*
1201 			 * Before sleeping, see if we can find an
1202 			 * active transport which isn't being serviced
1203 			 * by a thread.
1204 			 */
1205 			if (svc_request_space_available(pool) &&
1206 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1207 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1208 				SVC_ACQUIRE(xprt);
1209 				xprt->xp_thread = st;
1210 				st->st_xprt = xprt;
1211 				continue;
1212 			}
1213 
1214 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1215 			if (ismaster || (!ismaster &&
1216 			    grp->sg_threadcount > grp->sg_minthreads))
1217 				error = cv_timedwait_sig(&st->st_cond,
1218 				    &grp->sg_lock, 5 * hz);
1219 			else
1220 				error = cv_wait_sig(&st->st_cond,
1221 				    &grp->sg_lock);
1222 			if (st->st_xprt == NULL)
1223 				LIST_REMOVE(st, st_ilink);
1224 
1225 			/*
1226 			 * Reduce worker thread count when idle.
1227 			 */
1228 			if (error == EWOULDBLOCK) {
1229 				if (!ismaster
1230 				    && (grp->sg_threadcount
1231 					> grp->sg_minthreads)
1232 					&& !st->st_xprt)
1233 					break;
1234 			} else if (error != 0) {
1235 				KASSERT(error == EINTR || error == ERESTART,
1236 				    ("non-signal error %d", error));
1237 				mtx_unlock(&grp->sg_lock);
1238 				p = curproc;
1239 				PROC_LOCK(p);
1240 				if (P_SHOULDSTOP(p) ||
1241 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1242 					thread_suspend_check(0);
1243 					PROC_UNLOCK(p);
1244 					mtx_lock(&grp->sg_lock);
1245 				} else {
1246 					PROC_UNLOCK(p);
1247 					svc_exit(pool);
1248 					mtx_lock(&grp->sg_lock);
1249 					break;
1250 				}
1251 			}
1252 			continue;
1253 		}
1254 		mtx_unlock(&grp->sg_lock);
1255 
1256 		/*
1257 		 * Drain the transport socket and queue up any RPCs.
1258 		 */
1259 		xprt->xp_lastactive = time_uptime;
1260 		do {
1261 			if (!svc_request_space_available(pool))
1262 				break;
1263 			rqstp = NULL;
1264 			stat = svc_getreq(xprt, &rqstp);
1265 			if (rqstp) {
1266 				svc_change_space_used(pool, rqstp->rq_size);
1267 				/*
1268 				 * See if the application has a preference
1269 				 * for some other thread.
1270 				 */
1271 				if (pool->sp_assign) {
1272 					stpref = pool->sp_assign(st, rqstp);
1273 					rqstp->rq_thread = stpref;
1274 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1275 					    rqstp, rq_link);
1276 					mtx_unlock(&stpref->st_lock);
1277 					if (stpref != st)
1278 						rqstp = NULL;
1279 				} else {
1280 					rqstp->rq_thread = st;
1281 					STAILQ_INSERT_TAIL(&st->st_reqs,
1282 					    rqstp, rq_link);
1283 				}
1284 			}
1285 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1286 		    && grp->sg_state != SVCPOOL_CLOSING);
1287 
1288 		/*
1289 		 * Move this transport to the end of the active list to
1290 		 * ensure fairness when multiple transports are active.
1291 		 * If this was the last queued request, svc_getreq will end
1292 		 * up calling xprt_inactive to remove from the active list.
1293 		 */
1294 		mtx_lock(&grp->sg_lock);
1295 		xprt->xp_thread = NULL;
1296 		st->st_xprt = NULL;
1297 		if (xprt->xp_active) {
1298 			if (!svc_request_space_available(pool) ||
1299 			    !xprt_assignthread(xprt))
1300 				TAILQ_INSERT_TAIL(&grp->sg_active,
1301 				    xprt, xp_alink);
1302 		}
1303 		mtx_unlock(&grp->sg_lock);
1304 		SVC_RELEASE(xprt);
1305 
1306 		/*
1307 		 * Execute what we have queued.
1308 		 */
1309 		mtx_lock(&st->st_lock);
1310 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1311 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1312 			mtx_unlock(&st->st_lock);
1313 			sz = (long)rqstp->rq_size;
1314 			svc_executereq(rqstp);
1315 			svc_change_space_used(pool, -sz);
1316 			mtx_lock(&st->st_lock);
1317 		}
1318 		mtx_unlock(&st->st_lock);
1319 		mtx_lock(&grp->sg_lock);
1320 	}
1321 
1322 	if (st->st_xprt) {
1323 		xprt = st->st_xprt;
1324 		st->st_xprt = NULL;
1325 		SVC_RELEASE(xprt);
1326 	}
1327 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1328 	mtx_destroy(&st->st_lock);
1329 	cv_destroy(&st->st_cond);
1330 	mem_free(st, sizeof(*st));
1331 
1332 	grp->sg_threadcount--;
1333 	if (!ismaster)
1334 		wakeup(grp);
1335 	mtx_unlock(&grp->sg_lock);
1336 }
1337 
1338 static void
1339 svc_thread_start(void *arg)
1340 {
1341 
1342 	svc_run_internal((SVCGROUP *) arg, FALSE);
1343 	kthread_exit();
1344 }
1345 
1346 static void
1347 svc_new_thread(SVCGROUP *grp)
1348 {
1349 	SVCPOOL *pool = grp->sg_pool;
1350 	struct thread *td;
1351 
1352 	mtx_lock(&grp->sg_lock);
1353 	grp->sg_threadcount++;
1354 	mtx_unlock(&grp->sg_lock);
1355 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1356 	    "%s: service", pool->sp_name);
1357 }
1358 
1359 void
1360 svc_run(SVCPOOL *pool)
1361 {
1362 	int g, i;
1363 	struct proc *p;
1364 	struct thread *td;
1365 	SVCGROUP *grp;
1366 
1367 	p = curproc;
1368 	td = curthread;
1369 	snprintf(td->td_name, sizeof(td->td_name),
1370 	    "%s: master", pool->sp_name);
1371 	pool->sp_state = SVCPOOL_ACTIVE;
1372 	pool->sp_proc = p;
1373 
1374 	/* Choose group count based on number of threads and CPUs. */
1375 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1376 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1377 	for (g = 0; g < pool->sp_groupcount; g++) {
1378 		grp = &pool->sp_groups[g];
1379 		grp->sg_minthreads = max(1,
1380 		    pool->sp_minthreads / pool->sp_groupcount);
1381 		grp->sg_maxthreads = max(1,
1382 		    pool->sp_maxthreads / pool->sp_groupcount);
1383 		grp->sg_lastcreatetime = time_uptime;
1384 	}
1385 
1386 	/* Starting threads */
1387 	pool->sp_groups[0].sg_threadcount++;
1388 	for (g = 0; g < pool->sp_groupcount; g++) {
1389 		grp = &pool->sp_groups[g];
1390 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1391 			svc_new_thread(grp);
1392 	}
1393 	svc_run_internal(&pool->sp_groups[0], TRUE);
1394 
1395 	/* Waiting for threads to stop. */
1396 	for (g = 0; g < pool->sp_groupcount; g++) {
1397 		grp = &pool->sp_groups[g];
1398 		mtx_lock(&grp->sg_lock);
1399 		while (grp->sg_threadcount > 0)
1400 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1401 		mtx_unlock(&grp->sg_lock);
1402 	}
1403 }
1404 
1405 void
1406 svc_exit(SVCPOOL *pool)
1407 {
1408 	SVCGROUP *grp;
1409 	SVCTHREAD *st;
1410 	int g;
1411 
1412 	pool->sp_state = SVCPOOL_CLOSING;
1413 	for (g = 0; g < pool->sp_groupcount; g++) {
1414 		grp = &pool->sp_groups[g];
1415 		mtx_lock(&grp->sg_lock);
1416 		if (grp->sg_state != SVCPOOL_CLOSING) {
1417 			grp->sg_state = SVCPOOL_CLOSING;
1418 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1419 				cv_signal(&st->st_cond);
1420 		}
1421 		mtx_unlock(&grp->sg_lock);
1422 	}
1423 }
1424 
1425 bool_t
1426 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1427 {
1428 	struct mbuf *m;
1429 	XDR xdrs;
1430 	bool_t stat;
1431 
1432 	m = rqstp->rq_args;
1433 	rqstp->rq_args = NULL;
1434 
1435 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1436 	stat = xargs(&xdrs, args);
1437 	XDR_DESTROY(&xdrs);
1438 
1439 	return (stat);
1440 }
1441 
1442 bool_t
1443 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1444 {
1445 	XDR xdrs;
1446 
1447 	if (rqstp->rq_addr) {
1448 		free(rqstp->rq_addr, M_SONAME);
1449 		rqstp->rq_addr = NULL;
1450 	}
1451 
1452 	xdrs.x_op = XDR_FREE;
1453 	return (xargs(&xdrs, args));
1454 }
1455 
1456 void
1457 svc_freereq(struct svc_req *rqstp)
1458 {
1459 	SVCTHREAD *st;
1460 	SVCPOOL *pool;
1461 
1462 	st = rqstp->rq_thread;
1463 	if (st) {
1464 		pool = st->st_pool;
1465 		if (pool->sp_done)
1466 			pool->sp_done(st, rqstp);
1467 	}
1468 
1469 	if (rqstp->rq_auth.svc_ah_ops)
1470 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1471 
1472 	if (rqstp->rq_xprt) {
1473 		SVC_RELEASE(rqstp->rq_xprt);
1474 	}
1475 
1476 	if (rqstp->rq_addr)
1477 		free(rqstp->rq_addr, M_SONAME);
1478 
1479 	if (rqstp->rq_args)
1480 		m_freem(rqstp->rq_args);
1481 
1482 	free(rqstp, M_RPC);
1483 }
1484