xref: /freebsd/sys/rpc/svc.c (revision d13def78ccef6dbc25c2e197089ee5fc4d7b82c3)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice,
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice,
14  *   this list of conditions and the following disclaimer in the documentation
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its
17  *   contributors may be used to endorse or promote products derived
18  *   from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39 
40 /*
41  * svc.c, Server-side remote procedure call interface.
42  *
43  * There are two sets of procedures here.  The xprt routines are
44  * for handling transport handles.  The svc routines handle the
45  * list of service routines.
46  *
47  * Copyright (C) 1984, Sun Microsystems, Inc.
48  */
49 
50 #include <sys/param.h>
51 #include <sys/lock.h>
52 #include <sys/kernel.h>
53 #include <sys/kthread.h>
54 #include <sys/malloc.h>
55 #include <sys/mbuf.h>
56 #include <sys/mutex.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/socketvar.h>
60 #include <sys/systm.h>
61 #include <sys/smp.h>
62 #include <sys/sx.h>
63 #include <sys/ucred.h>
64 
65 #include <rpc/rpc.h>
66 #include <rpc/rpcb_clnt.h>
67 #include <rpc/replay.h>
68 
69 #include <rpc/rpc_com.h>
70 
71 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
73 
74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
75     char *);
76 static void svc_new_thread(SVCGROUP *grp);
77 static void xprt_unregister_locked(SVCXPRT *xprt);
78 static void svc_change_space_used(SVCPOOL *pool, long delta);
79 static bool_t svc_request_space_available(SVCPOOL *pool);
80 static void svcpool_cleanup(SVCPOOL *pool);
81 
82 /* ***************  SVCXPRT related stuff **************** */
83 
84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
87 
88 SVCPOOL*
89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
90 {
91 	SVCPOOL *pool;
92 	SVCGROUP *grp;
93 	int g;
94 
95 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
96 
97 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
98 	pool->sp_name = name;
99 	pool->sp_state = SVCPOOL_INIT;
100 	pool->sp_proc = NULL;
101 	TAILQ_INIT(&pool->sp_callouts);
102 	TAILQ_INIT(&pool->sp_lcallouts);
103 	pool->sp_minthreads = 1;
104 	pool->sp_maxthreads = 1;
105 	pool->sp_groupcount = 1;
106 	for (g = 0; g < SVC_MAXGROUPS; g++) {
107 		grp = &pool->sp_groups[g];
108 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
109 		grp->sg_pool = pool;
110 		grp->sg_state = SVCPOOL_ACTIVE;
111 		TAILQ_INIT(&grp->sg_xlist);
112 		TAILQ_INIT(&grp->sg_active);
113 		LIST_INIT(&grp->sg_idlethreads);
114 		grp->sg_minthreads = 1;
115 		grp->sg_maxthreads = 1;
116 	}
117 
118 	/*
119 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
120 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
121 	 * on LP64 architectures, so cast to u_long to avoid undefined
122 	 * behavior.  (ILP32 architectures cannot have nmbclusters
123 	 * large enough to overflow for other reasons.)
124 	 */
125 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
126 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
127 
128 	sysctl_ctx_init(&pool->sp_sysctl);
129 	if (sysctl_base) {
130 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
132 		    pool, 0, svcpool_minthread_sysctl, "I",
133 		    "Minimal number of threads");
134 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
136 		    pool, 0, svcpool_maxthread_sysctl, "I",
137 		    "Maximal number of threads");
138 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139 		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
140 		    pool, 0, svcpool_threads_sysctl, "I",
141 		    "Current number of threads");
142 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
144 		    "Number of thread groups");
145 
146 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 		    "request_space_used", CTLFLAG_RD,
148 		    &pool->sp_space_used,
149 		    "Space in parsed but not handled requests.");
150 
151 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152 		    "request_space_used_highest", CTLFLAG_RD,
153 		    &pool->sp_space_used_highest,
154 		    "Highest space used since reboot.");
155 
156 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157 		    "request_space_high", CTLFLAG_RW,
158 		    &pool->sp_space_high,
159 		    "Maximum space in parsed but not handled requests.");
160 
161 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162 		    "request_space_low", CTLFLAG_RW,
163 		    &pool->sp_space_low,
164 		    "Low water mark for request space.");
165 
166 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167 		    "request_space_throttled", CTLFLAG_RD,
168 		    &pool->sp_space_throttled, 0,
169 		    "Whether nfs requests are currently throttled");
170 
171 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
172 		    "request_space_throttle_count", CTLFLAG_RD,
173 		    &pool->sp_space_throttle_count, 0,
174 		    "Count of times throttling based on request space has occurred");
175 	}
176 
177 	return pool;
178 }
179 
180 /*
181  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
182  * the pool data structures.
183  */
184 static void
185 svcpool_cleanup(SVCPOOL *pool)
186 {
187 	SVCGROUP *grp;
188 	SVCXPRT *xprt, *nxprt;
189 	struct svc_callout *s;
190 	struct svc_loss_callout *sl;
191 	struct svcxprt_list cleanup;
192 	int g;
193 
194 	TAILQ_INIT(&cleanup);
195 
196 	for (g = 0; g < SVC_MAXGROUPS; g++) {
197 		grp = &pool->sp_groups[g];
198 		mtx_lock(&grp->sg_lock);
199 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
200 			xprt_unregister_locked(xprt);
201 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
202 		}
203 		mtx_unlock(&grp->sg_lock);
204 	}
205 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
206 		SVC_RELEASE(xprt);
207 	}
208 
209 	mtx_lock(&pool->sp_lock);
210 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
211 		mtx_unlock(&pool->sp_lock);
212 		svc_unreg(pool, s->sc_prog, s->sc_vers);
213 		mtx_lock(&pool->sp_lock);
214 	}
215 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
216 		mtx_unlock(&pool->sp_lock);
217 		svc_loss_unreg(pool, sl->slc_dispatch);
218 		mtx_lock(&pool->sp_lock);
219 	}
220 	mtx_unlock(&pool->sp_lock);
221 }
222 
223 void
224 svcpool_destroy(SVCPOOL *pool)
225 {
226 	SVCGROUP *grp;
227 	int g;
228 
229 	svcpool_cleanup(pool);
230 
231 	for (g = 0; g < SVC_MAXGROUPS; g++) {
232 		grp = &pool->sp_groups[g];
233 		mtx_destroy(&grp->sg_lock);
234 	}
235 	mtx_destroy(&pool->sp_lock);
236 
237 	if (pool->sp_rcache)
238 		replay_freecache(pool->sp_rcache);
239 
240 	sysctl_ctx_free(&pool->sp_sysctl);
241 	free(pool, M_RPC);
242 }
243 
244 /*
245  * Similar to svcpool_destroy(), except that it does not destroy the actual
246  * data structures.  As such, "pool" may be used again.
247  */
248 void
249 svcpool_close(SVCPOOL *pool)
250 {
251 	SVCGROUP *grp;
252 	int g;
253 
254 	svcpool_cleanup(pool);
255 
256 	/* Now, initialize the pool's state for a fresh svc_run() call. */
257 	mtx_lock(&pool->sp_lock);
258 	pool->sp_state = SVCPOOL_INIT;
259 	mtx_unlock(&pool->sp_lock);
260 	for (g = 0; g < SVC_MAXGROUPS; g++) {
261 		grp = &pool->sp_groups[g];
262 		mtx_lock(&grp->sg_lock);
263 		grp->sg_state = SVCPOOL_ACTIVE;
264 		mtx_unlock(&grp->sg_lock);
265 	}
266 }
267 
268 /*
269  * Sysctl handler to get the present thread count on a pool
270  */
271 static int
272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
273 {
274 	SVCPOOL *pool;
275 	int threads, error, g;
276 
277 	pool = oidp->oid_arg1;
278 	threads = 0;
279 	mtx_lock(&pool->sp_lock);
280 	for (g = 0; g < pool->sp_groupcount; g++)
281 		threads += pool->sp_groups[g].sg_threadcount;
282 	mtx_unlock(&pool->sp_lock);
283 	error = sysctl_handle_int(oidp, &threads, 0, req);
284 	return (error);
285 }
286 
287 /*
288  * Sysctl handler to set the minimum thread count on a pool
289  */
290 static int
291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
292 {
293 	SVCPOOL *pool;
294 	int newminthreads, error, g;
295 
296 	pool = oidp->oid_arg1;
297 	newminthreads = pool->sp_minthreads;
298 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
299 	if (error == 0 && newminthreads != pool->sp_minthreads) {
300 		if (newminthreads > pool->sp_maxthreads)
301 			return (EINVAL);
302 		mtx_lock(&pool->sp_lock);
303 		pool->sp_minthreads = newminthreads;
304 		for (g = 0; g < pool->sp_groupcount; g++) {
305 			pool->sp_groups[g].sg_minthreads = max(1,
306 			    pool->sp_minthreads / pool->sp_groupcount);
307 		}
308 		mtx_unlock(&pool->sp_lock);
309 	}
310 	return (error);
311 }
312 
313 /*
314  * Sysctl handler to set the maximum thread count on a pool
315  */
316 static int
317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
318 {
319 	SVCPOOL *pool;
320 	int newmaxthreads, error, g;
321 
322 	pool = oidp->oid_arg1;
323 	newmaxthreads = pool->sp_maxthreads;
324 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
325 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
326 		if (newmaxthreads < pool->sp_minthreads)
327 			return (EINVAL);
328 		mtx_lock(&pool->sp_lock);
329 		pool->sp_maxthreads = newmaxthreads;
330 		for (g = 0; g < pool->sp_groupcount; g++) {
331 			pool->sp_groups[g].sg_maxthreads = max(1,
332 			    pool->sp_maxthreads / pool->sp_groupcount);
333 		}
334 		mtx_unlock(&pool->sp_lock);
335 	}
336 	return (error);
337 }
338 
339 /*
340  * Activate a transport handle.
341  */
342 void
343 xprt_register(SVCXPRT *xprt)
344 {
345 	SVCPOOL *pool = xprt->xp_pool;
346 	SVCGROUP *grp;
347 	int g;
348 
349 	SVC_ACQUIRE(xprt);
350 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
351 	xprt->xp_group = grp = &pool->sp_groups[g];
352 	mtx_lock(&grp->sg_lock);
353 	xprt->xp_registered = TRUE;
354 	xprt->xp_active = FALSE;
355 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
356 	mtx_unlock(&grp->sg_lock);
357 }
358 
359 /*
360  * De-activate a transport handle. Note: the locked version doesn't
361  * release the transport - caller must do that after dropping the pool
362  * lock.
363  */
364 static void
365 xprt_unregister_locked(SVCXPRT *xprt)
366 {
367 	SVCGROUP *grp = xprt->xp_group;
368 
369 	mtx_assert(&grp->sg_lock, MA_OWNED);
370 	KASSERT(xprt->xp_registered == TRUE,
371 	    ("xprt_unregister_locked: not registered"));
372 	xprt_inactive_locked(xprt);
373 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
374 	xprt->xp_registered = FALSE;
375 }
376 
377 void
378 xprt_unregister(SVCXPRT *xprt)
379 {
380 	SVCGROUP *grp = xprt->xp_group;
381 
382 	mtx_lock(&grp->sg_lock);
383 	if (xprt->xp_registered == FALSE) {
384 		/* Already unregistered by another thread */
385 		mtx_unlock(&grp->sg_lock);
386 		return;
387 	}
388 	xprt_unregister_locked(xprt);
389 	mtx_unlock(&grp->sg_lock);
390 
391 	SVC_RELEASE(xprt);
392 }
393 
394 /*
395  * Attempt to assign a service thread to this transport.
396  */
397 static int
398 xprt_assignthread(SVCXPRT *xprt)
399 {
400 	SVCGROUP *grp = xprt->xp_group;
401 	SVCTHREAD *st;
402 
403 	mtx_assert(&grp->sg_lock, MA_OWNED);
404 	st = LIST_FIRST(&grp->sg_idlethreads);
405 	if (st) {
406 		LIST_REMOVE(st, st_ilink);
407 		SVC_ACQUIRE(xprt);
408 		xprt->xp_thread = st;
409 		st->st_xprt = xprt;
410 		cv_signal(&st->st_cond);
411 		return (TRUE);
412 	} else {
413 		/*
414 		 * See if we can create a new thread. The
415 		 * actual thread creation happens in
416 		 * svc_run_internal because our locking state
417 		 * is poorly defined (we are typically called
418 		 * from a socket upcall). Don't create more
419 		 * than one thread per second.
420 		 */
421 		if (grp->sg_state == SVCPOOL_ACTIVE
422 		    && grp->sg_lastcreatetime < time_uptime
423 		    && grp->sg_threadcount < grp->sg_maxthreads) {
424 			grp->sg_state = SVCPOOL_THREADWANTED;
425 		}
426 	}
427 	return (FALSE);
428 }
429 
430 void
431 xprt_active(SVCXPRT *xprt)
432 {
433 	SVCGROUP *grp = xprt->xp_group;
434 
435 	mtx_lock(&grp->sg_lock);
436 
437 	if (!xprt->xp_registered) {
438 		/*
439 		 * Race with xprt_unregister - we lose.
440 		 */
441 		mtx_unlock(&grp->sg_lock);
442 		return;
443 	}
444 
445 	if (!xprt->xp_active) {
446 		xprt->xp_active = TRUE;
447 		if (xprt->xp_thread == NULL) {
448 			if (!svc_request_space_available(xprt->xp_pool) ||
449 			    !xprt_assignthread(xprt))
450 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
451 				    xp_alink);
452 		}
453 	}
454 
455 	mtx_unlock(&grp->sg_lock);
456 }
457 
458 void
459 xprt_inactive_locked(SVCXPRT *xprt)
460 {
461 	SVCGROUP *grp = xprt->xp_group;
462 
463 	mtx_assert(&grp->sg_lock, MA_OWNED);
464 	if (xprt->xp_active) {
465 		if (xprt->xp_thread == NULL)
466 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
467 		xprt->xp_active = FALSE;
468 	}
469 }
470 
471 void
472 xprt_inactive(SVCXPRT *xprt)
473 {
474 	SVCGROUP *grp = xprt->xp_group;
475 
476 	mtx_lock(&grp->sg_lock);
477 	xprt_inactive_locked(xprt);
478 	mtx_unlock(&grp->sg_lock);
479 }
480 
481 /*
482  * Variant of xprt_inactive() for use only when sure that port is
483  * assigned to thread. For example, within receive handlers.
484  */
485 void
486 xprt_inactive_self(SVCXPRT *xprt)
487 {
488 
489 	KASSERT(xprt->xp_thread != NULL,
490 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
491 	xprt->xp_active = FALSE;
492 }
493 
494 /*
495  * Add a service program to the callout list.
496  * The dispatch routine will be called when a rpc request for this
497  * program number comes in.
498  */
499 bool_t
500 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
501     void (*dispatch)(struct svc_req *, SVCXPRT *),
502     const struct netconfig *nconf)
503 {
504 	SVCPOOL *pool = xprt->xp_pool;
505 	struct svc_callout *s;
506 	char *netid = NULL;
507 	int flag = 0;
508 
509 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
510 
511 	if (xprt->xp_netid) {
512 		netid = strdup(xprt->xp_netid, M_RPC);
513 		flag = 1;
514 	} else if (nconf && nconf->nc_netid) {
515 		netid = strdup(nconf->nc_netid, M_RPC);
516 		flag = 1;
517 	} /* must have been created with svc_raw_create */
518 	if ((netid == NULL) && (flag == 1)) {
519 		return (FALSE);
520 	}
521 
522 	mtx_lock(&pool->sp_lock);
523 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
524 		if (netid)
525 			free(netid, M_RPC);
526 		if (s->sc_dispatch == dispatch)
527 			goto rpcb_it; /* he is registering another xptr */
528 		mtx_unlock(&pool->sp_lock);
529 		return (FALSE);
530 	}
531 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
532 	if (s == NULL) {
533 		if (netid)
534 			free(netid, M_RPC);
535 		mtx_unlock(&pool->sp_lock);
536 		return (FALSE);
537 	}
538 
539 	s->sc_prog = prog;
540 	s->sc_vers = vers;
541 	s->sc_dispatch = dispatch;
542 	s->sc_netid = netid;
543 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
544 
545 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
546 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
547 
548 rpcb_it:
549 	mtx_unlock(&pool->sp_lock);
550 	/* now register the information with the local binder service */
551 	if (nconf) {
552 		bool_t dummy;
553 		struct netconfig tnc;
554 		struct netbuf nb;
555 		tnc = *nconf;
556 		nb.buf = &xprt->xp_ltaddr;
557 		nb.len = xprt->xp_ltaddr.ss_len;
558 		dummy = rpcb_set(prog, vers, &tnc, &nb);
559 		return (dummy);
560 	}
561 	return (TRUE);
562 }
563 
564 /*
565  * Remove a service program from the callout list.
566  */
567 void
568 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
569 {
570 	struct svc_callout *s;
571 
572 	/* unregister the information anyway */
573 	(void) rpcb_unset(prog, vers, NULL);
574 	mtx_lock(&pool->sp_lock);
575 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
576 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
577 		if (s->sc_netid)
578 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
579 		mem_free(s, sizeof (struct svc_callout));
580 	}
581 	mtx_unlock(&pool->sp_lock);
582 }
583 
584 /*
585  * Add a service connection loss program to the callout list.
586  * The dispatch routine will be called when some port in ths pool die.
587  */
588 bool_t
589 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
590 {
591 	SVCPOOL *pool = xprt->xp_pool;
592 	struct svc_loss_callout *s;
593 
594 	mtx_lock(&pool->sp_lock);
595 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
596 		if (s->slc_dispatch == dispatch)
597 			break;
598 	}
599 	if (s != NULL) {
600 		mtx_unlock(&pool->sp_lock);
601 		return (TRUE);
602 	}
603 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
604 	if (s == NULL) {
605 		mtx_unlock(&pool->sp_lock);
606 		return (FALSE);
607 	}
608 	s->slc_dispatch = dispatch;
609 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
610 	mtx_unlock(&pool->sp_lock);
611 	return (TRUE);
612 }
613 
614 /*
615  * Remove a service connection loss program from the callout list.
616  */
617 void
618 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
619 {
620 	struct svc_loss_callout *s;
621 
622 	mtx_lock(&pool->sp_lock);
623 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
624 		if (s->slc_dispatch == dispatch) {
625 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
626 			free(s, M_RPC);
627 			break;
628 		}
629 	}
630 	mtx_unlock(&pool->sp_lock);
631 }
632 
633 /* ********************** CALLOUT list related stuff ************* */
634 
635 /*
636  * Search the callout list for a program number, return the callout
637  * struct.
638  */
639 static struct svc_callout *
640 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
641 {
642 	struct svc_callout *s;
643 
644 	mtx_assert(&pool->sp_lock, MA_OWNED);
645 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
646 		if (s->sc_prog == prog && s->sc_vers == vers
647 		    && (netid == NULL || s->sc_netid == NULL ||
648 			strcmp(netid, s->sc_netid) == 0))
649 			break;
650 	}
651 
652 	return (s);
653 }
654 
655 /* ******************* REPLY GENERATION ROUTINES  ************ */
656 
657 static bool_t
658 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
659     struct mbuf *body)
660 {
661 	SVCXPRT *xprt = rqstp->rq_xprt;
662 	bool_t ok;
663 
664 	if (rqstp->rq_args) {
665 		m_freem(rqstp->rq_args);
666 		rqstp->rq_args = NULL;
667 	}
668 
669 	if (xprt->xp_pool->sp_rcache)
670 		replay_setreply(xprt->xp_pool->sp_rcache,
671 		    rply, svc_getrpccaller(rqstp), body);
672 
673 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
674 		return (FALSE);
675 
676 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
677 	if (rqstp->rq_addr) {
678 		free(rqstp->rq_addr, M_SONAME);
679 		rqstp->rq_addr = NULL;
680 	}
681 
682 	return (ok);
683 }
684 
685 /*
686  * Send a reply to an rpc request
687  */
688 bool_t
689 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
690 {
691 	struct rpc_msg rply;
692 	struct mbuf *m;
693 	XDR xdrs;
694 	bool_t ok;
695 
696 	rply.rm_xid = rqstp->rq_xid;
697 	rply.rm_direction = REPLY;
698 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
699 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
700 	rply.acpted_rply.ar_stat = SUCCESS;
701 	rply.acpted_rply.ar_results.where = NULL;
702 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
703 
704 	m = m_getcl(M_WAITOK, MT_DATA, 0);
705 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
706 	ok = xdr_results(&xdrs, xdr_location);
707 	XDR_DESTROY(&xdrs);
708 
709 	if (ok) {
710 		return (svc_sendreply_common(rqstp, &rply, m));
711 	} else {
712 		m_freem(m);
713 		return (FALSE);
714 	}
715 }
716 
717 bool_t
718 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
719 {
720 	struct rpc_msg rply;
721 
722 	rply.rm_xid = rqstp->rq_xid;
723 	rply.rm_direction = REPLY;
724 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
725 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
726 	rply.acpted_rply.ar_stat = SUCCESS;
727 	rply.acpted_rply.ar_results.where = NULL;
728 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
729 
730 	return (svc_sendreply_common(rqstp, &rply, m));
731 }
732 
733 /*
734  * No procedure error reply
735  */
736 void
737 svcerr_noproc(struct svc_req *rqstp)
738 {
739 	SVCXPRT *xprt = rqstp->rq_xprt;
740 	struct rpc_msg rply;
741 
742 	rply.rm_xid = rqstp->rq_xid;
743 	rply.rm_direction = REPLY;
744 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
745 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
746 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
747 
748 	if (xprt->xp_pool->sp_rcache)
749 		replay_setreply(xprt->xp_pool->sp_rcache,
750 		    &rply, svc_getrpccaller(rqstp), NULL);
751 
752 	svc_sendreply_common(rqstp, &rply, NULL);
753 }
754 
755 /*
756  * Can't decode args error reply
757  */
758 void
759 svcerr_decode(struct svc_req *rqstp)
760 {
761 	SVCXPRT *xprt = rqstp->rq_xprt;
762 	struct rpc_msg rply;
763 
764 	rply.rm_xid = rqstp->rq_xid;
765 	rply.rm_direction = REPLY;
766 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
767 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
768 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
769 
770 	if (xprt->xp_pool->sp_rcache)
771 		replay_setreply(xprt->xp_pool->sp_rcache,
772 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
773 
774 	svc_sendreply_common(rqstp, &rply, NULL);
775 }
776 
777 /*
778  * Some system error
779  */
780 void
781 svcerr_systemerr(struct svc_req *rqstp)
782 {
783 	SVCXPRT *xprt = rqstp->rq_xprt;
784 	struct rpc_msg rply;
785 
786 	rply.rm_xid = rqstp->rq_xid;
787 	rply.rm_direction = REPLY;
788 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
789 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
790 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
791 
792 	if (xprt->xp_pool->sp_rcache)
793 		replay_setreply(xprt->xp_pool->sp_rcache,
794 		    &rply, svc_getrpccaller(rqstp), NULL);
795 
796 	svc_sendreply_common(rqstp, &rply, NULL);
797 }
798 
799 /*
800  * Authentication error reply
801  */
802 void
803 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
804 {
805 	SVCXPRT *xprt = rqstp->rq_xprt;
806 	struct rpc_msg rply;
807 
808 	rply.rm_xid = rqstp->rq_xid;
809 	rply.rm_direction = REPLY;
810 	rply.rm_reply.rp_stat = MSG_DENIED;
811 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
812 	rply.rjcted_rply.rj_why = why;
813 
814 	if (xprt->xp_pool->sp_rcache)
815 		replay_setreply(xprt->xp_pool->sp_rcache,
816 		    &rply, svc_getrpccaller(rqstp), NULL);
817 
818 	svc_sendreply_common(rqstp, &rply, NULL);
819 }
820 
821 /*
822  * Auth too weak error reply
823  */
824 void
825 svcerr_weakauth(struct svc_req *rqstp)
826 {
827 
828 	svcerr_auth(rqstp, AUTH_TOOWEAK);
829 }
830 
831 /*
832  * Program unavailable error reply
833  */
834 void
835 svcerr_noprog(struct svc_req *rqstp)
836 {
837 	SVCXPRT *xprt = rqstp->rq_xprt;
838 	struct rpc_msg rply;
839 
840 	rply.rm_xid = rqstp->rq_xid;
841 	rply.rm_direction = REPLY;
842 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
843 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
844 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
845 
846 	if (xprt->xp_pool->sp_rcache)
847 		replay_setreply(xprt->xp_pool->sp_rcache,
848 		    &rply, svc_getrpccaller(rqstp), NULL);
849 
850 	svc_sendreply_common(rqstp, &rply, NULL);
851 }
852 
853 /*
854  * Program version mismatch error reply
855  */
856 void
857 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
858 {
859 	SVCXPRT *xprt = rqstp->rq_xprt;
860 	struct rpc_msg rply;
861 
862 	rply.rm_xid = rqstp->rq_xid;
863 	rply.rm_direction = REPLY;
864 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
865 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
866 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
867 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
868 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
869 
870 	if (xprt->xp_pool->sp_rcache)
871 		replay_setreply(xprt->xp_pool->sp_rcache,
872 		    &rply, svc_getrpccaller(rqstp), NULL);
873 
874 	svc_sendreply_common(rqstp, &rply, NULL);
875 }
876 
877 /*
878  * Allocate a new server transport structure. All fields are
879  * initialized to zero and xp_p3 is initialized to point at an
880  * extension structure to hold various flags and authentication
881  * parameters.
882  */
883 SVCXPRT *
884 svc_xprt_alloc(void)
885 {
886 	SVCXPRT *xprt;
887 	SVCXPRT_EXT *ext;
888 
889 	xprt = mem_alloc(sizeof(SVCXPRT));
890 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
891 	xprt->xp_p3 = ext;
892 	refcount_init(&xprt->xp_refs, 1);
893 
894 	return (xprt);
895 }
896 
897 /*
898  * Free a server transport structure.
899  */
900 void
901 svc_xprt_free(SVCXPRT *xprt)
902 {
903 
904 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
905 	mem_free(xprt, sizeof(SVCXPRT));
906 }
907 
908 /* ******************* SERVER INPUT STUFF ******************* */
909 
910 /*
911  * Read RPC requests from a transport and queue them to be
912  * executed. We handle authentication and replay cache replies here.
913  * Actually dispatching the RPC is deferred till svc_executereq.
914  */
915 static enum xprt_stat
916 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
917 {
918 	SVCPOOL *pool = xprt->xp_pool;
919 	struct svc_req *r;
920 	struct rpc_msg msg;
921 	struct mbuf *args;
922 	struct svc_loss_callout *s;
923 	enum xprt_stat stat;
924 
925 	/* now receive msgs from xprtprt (support batch calls) */
926 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
927 
928 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
929 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
930 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
931 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
932 		enum auth_stat why;
933 
934 		/*
935 		 * Handle replays and authenticate before queuing the
936 		 * request to be executed.
937 		 */
938 		SVC_ACQUIRE(xprt);
939 		r->rq_xprt = xprt;
940 		if (pool->sp_rcache) {
941 			struct rpc_msg repmsg;
942 			struct mbuf *repbody;
943 			enum replay_state rs;
944 			rs = replay_find(pool->sp_rcache, &msg,
945 			    svc_getrpccaller(r), &repmsg, &repbody);
946 			switch (rs) {
947 			case RS_NEW:
948 				break;
949 			case RS_DONE:
950 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
951 				    repbody, &r->rq_reply_seq);
952 				if (r->rq_addr) {
953 					free(r->rq_addr, M_SONAME);
954 					r->rq_addr = NULL;
955 				}
956 				m_freem(args);
957 				goto call_done;
958 
959 			default:
960 				m_freem(args);
961 				goto call_done;
962 			}
963 		}
964 
965 		r->rq_xid = msg.rm_xid;
966 		r->rq_prog = msg.rm_call.cb_prog;
967 		r->rq_vers = msg.rm_call.cb_vers;
968 		r->rq_proc = msg.rm_call.cb_proc;
969 		r->rq_size = sizeof(*r) + m_length(args, NULL);
970 		r->rq_args = args;
971 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
972 			/*
973 			 * RPCSEC_GSS uses this return code
974 			 * for requests that form part of its
975 			 * context establishment protocol and
976 			 * should not be dispatched to the
977 			 * application.
978 			 */
979 			if (why != RPCSEC_GSS_NODISPATCH)
980 				svcerr_auth(r, why);
981 			goto call_done;
982 		}
983 
984 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
985 			svcerr_decode(r);
986 			goto call_done;
987 		}
988 
989 		/*
990 		 * Everything checks out, return request to caller.
991 		 */
992 		*rqstp_ret = r;
993 		r = NULL;
994 	}
995 call_done:
996 	if (r) {
997 		svc_freereq(r);
998 		r = NULL;
999 	}
1000 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1001 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1002 			(*s->slc_dispatch)(xprt);
1003 		xprt_unregister(xprt);
1004 	}
1005 
1006 	return (stat);
1007 }
1008 
1009 static void
1010 svc_executereq(struct svc_req *rqstp)
1011 {
1012 	SVCXPRT *xprt = rqstp->rq_xprt;
1013 	SVCPOOL *pool = xprt->xp_pool;
1014 	int prog_found;
1015 	rpcvers_t low_vers;
1016 	rpcvers_t high_vers;
1017 	struct svc_callout *s;
1018 
1019 	/* now match message with a registered service*/
1020 	prog_found = FALSE;
1021 	low_vers = (rpcvers_t) -1L;
1022 	high_vers = (rpcvers_t) 0L;
1023 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1024 		if (s->sc_prog == rqstp->rq_prog) {
1025 			if (s->sc_vers == rqstp->rq_vers) {
1026 				/*
1027 				 * We hand ownership of r to the
1028 				 * dispatch method - they must call
1029 				 * svc_freereq.
1030 				 */
1031 				(*s->sc_dispatch)(rqstp, xprt);
1032 				return;
1033 			}  /* found correct version */
1034 			prog_found = TRUE;
1035 			if (s->sc_vers < low_vers)
1036 				low_vers = s->sc_vers;
1037 			if (s->sc_vers > high_vers)
1038 				high_vers = s->sc_vers;
1039 		}   /* found correct program */
1040 	}
1041 
1042 	/*
1043 	 * if we got here, the program or version
1044 	 * is not served ...
1045 	 */
1046 	if (prog_found)
1047 		svcerr_progvers(rqstp, low_vers, high_vers);
1048 	else
1049 		svcerr_noprog(rqstp);
1050 
1051 	svc_freereq(rqstp);
1052 }
1053 
1054 static void
1055 svc_checkidle(SVCGROUP *grp)
1056 {
1057 	SVCXPRT *xprt, *nxprt;
1058 	time_t timo;
1059 	struct svcxprt_list cleanup;
1060 
1061 	TAILQ_INIT(&cleanup);
1062 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1063 		/*
1064 		 * Only some transports have idle timers. Don't time
1065 		 * something out which is just waking up.
1066 		 */
1067 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1068 			continue;
1069 
1070 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1071 		if (time_uptime > timo) {
1072 			xprt_unregister_locked(xprt);
1073 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1074 		}
1075 	}
1076 
1077 	mtx_unlock(&grp->sg_lock);
1078 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1079 		SVC_RELEASE(xprt);
1080 	}
1081 	mtx_lock(&grp->sg_lock);
1082 }
1083 
1084 static void
1085 svc_assign_waiting_sockets(SVCPOOL *pool)
1086 {
1087 	SVCGROUP *grp;
1088 	SVCXPRT *xprt;
1089 	int g;
1090 
1091 	for (g = 0; g < pool->sp_groupcount; g++) {
1092 		grp = &pool->sp_groups[g];
1093 		mtx_lock(&grp->sg_lock);
1094 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1095 			if (xprt_assignthread(xprt))
1096 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1097 			else
1098 				break;
1099 		}
1100 		mtx_unlock(&grp->sg_lock);
1101 	}
1102 }
1103 
1104 static void
1105 svc_change_space_used(SVCPOOL *pool, long delta)
1106 {
1107 	unsigned long value;
1108 
1109 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1110 	if (delta > 0) {
1111 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1112 			pool->sp_space_throttled = TRUE;
1113 			pool->sp_space_throttle_count++;
1114 		}
1115 		if (value > pool->sp_space_used_highest)
1116 			pool->sp_space_used_highest = value;
1117 	} else {
1118 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1119 			pool->sp_space_throttled = FALSE;
1120 			svc_assign_waiting_sockets(pool);
1121 		}
1122 	}
1123 }
1124 
1125 static bool_t
1126 svc_request_space_available(SVCPOOL *pool)
1127 {
1128 
1129 	if (pool->sp_space_throttled)
1130 		return (FALSE);
1131 	return (TRUE);
1132 }
1133 
1134 static void
1135 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1136 {
1137 	SVCPOOL *pool = grp->sg_pool;
1138 	SVCTHREAD *st, *stpref;
1139 	SVCXPRT *xprt;
1140 	enum xprt_stat stat;
1141 	struct svc_req *rqstp;
1142 	struct proc *p;
1143 	long sz;
1144 	int error;
1145 
1146 	st = mem_alloc(sizeof(*st));
1147 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1148 	st->st_pool = pool;
1149 	st->st_xprt = NULL;
1150 	STAILQ_INIT(&st->st_reqs);
1151 	cv_init(&st->st_cond, "rpcsvc");
1152 
1153 	mtx_lock(&grp->sg_lock);
1154 
1155 	/*
1156 	 * If we are a new thread which was spawned to cope with
1157 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1158 	 */
1159 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1160 		grp->sg_state = SVCPOOL_ACTIVE;
1161 
1162 	while (grp->sg_state != SVCPOOL_CLOSING) {
1163 		/*
1164 		 * Create new thread if requested.
1165 		 */
1166 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1167 			grp->sg_state = SVCPOOL_THREADSTARTING;
1168 			grp->sg_lastcreatetime = time_uptime;
1169 			mtx_unlock(&grp->sg_lock);
1170 			svc_new_thread(grp);
1171 			mtx_lock(&grp->sg_lock);
1172 			continue;
1173 		}
1174 
1175 		/*
1176 		 * Check for idle transports once per second.
1177 		 */
1178 		if (time_uptime > grp->sg_lastidlecheck) {
1179 			grp->sg_lastidlecheck = time_uptime;
1180 			svc_checkidle(grp);
1181 		}
1182 
1183 		xprt = st->st_xprt;
1184 		if (!xprt) {
1185 			/*
1186 			 * Enforce maxthreads count.
1187 			 */
1188 			if (!ismaster && grp->sg_threadcount >
1189 			    grp->sg_maxthreads)
1190 				break;
1191 
1192 			/*
1193 			 * Before sleeping, see if we can find an
1194 			 * active transport which isn't being serviced
1195 			 * by a thread.
1196 			 */
1197 			if (svc_request_space_available(pool) &&
1198 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1199 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1200 				SVC_ACQUIRE(xprt);
1201 				xprt->xp_thread = st;
1202 				st->st_xprt = xprt;
1203 				continue;
1204 			}
1205 
1206 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1207 			if (ismaster || (!ismaster &&
1208 			    grp->sg_threadcount > grp->sg_minthreads))
1209 				error = cv_timedwait_sig(&st->st_cond,
1210 				    &grp->sg_lock, 5 * hz);
1211 			else
1212 				error = cv_wait_sig(&st->st_cond,
1213 				    &grp->sg_lock);
1214 			if (st->st_xprt == NULL)
1215 				LIST_REMOVE(st, st_ilink);
1216 
1217 			/*
1218 			 * Reduce worker thread count when idle.
1219 			 */
1220 			if (error == EWOULDBLOCK) {
1221 				if (!ismaster
1222 				    && (grp->sg_threadcount
1223 					> grp->sg_minthreads)
1224 					&& !st->st_xprt)
1225 					break;
1226 			} else if (error != 0) {
1227 				KASSERT(error == EINTR || error == ERESTART,
1228 				    ("non-signal error %d", error));
1229 				mtx_unlock(&grp->sg_lock);
1230 				p = curproc;
1231 				PROC_LOCK(p);
1232 				if (P_SHOULDSTOP(p) ||
1233 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1234 					thread_suspend_check(0);
1235 					PROC_UNLOCK(p);
1236 					mtx_lock(&grp->sg_lock);
1237 				} else {
1238 					PROC_UNLOCK(p);
1239 					svc_exit(pool);
1240 					mtx_lock(&grp->sg_lock);
1241 					break;
1242 				}
1243 			}
1244 			continue;
1245 		}
1246 		mtx_unlock(&grp->sg_lock);
1247 
1248 		/*
1249 		 * Drain the transport socket and queue up any RPCs.
1250 		 */
1251 		xprt->xp_lastactive = time_uptime;
1252 		do {
1253 			if (!svc_request_space_available(pool))
1254 				break;
1255 			rqstp = NULL;
1256 			stat = svc_getreq(xprt, &rqstp);
1257 			if (rqstp) {
1258 				svc_change_space_used(pool, rqstp->rq_size);
1259 				/*
1260 				 * See if the application has a preference
1261 				 * for some other thread.
1262 				 */
1263 				if (pool->sp_assign) {
1264 					stpref = pool->sp_assign(st, rqstp);
1265 					rqstp->rq_thread = stpref;
1266 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1267 					    rqstp, rq_link);
1268 					mtx_unlock(&stpref->st_lock);
1269 					if (stpref != st)
1270 						rqstp = NULL;
1271 				} else {
1272 					rqstp->rq_thread = st;
1273 					STAILQ_INSERT_TAIL(&st->st_reqs,
1274 					    rqstp, rq_link);
1275 				}
1276 			}
1277 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1278 		    && grp->sg_state != SVCPOOL_CLOSING);
1279 
1280 		/*
1281 		 * Move this transport to the end of the active list to
1282 		 * ensure fairness when multiple transports are active.
1283 		 * If this was the last queued request, svc_getreq will end
1284 		 * up calling xprt_inactive to remove from the active list.
1285 		 */
1286 		mtx_lock(&grp->sg_lock);
1287 		xprt->xp_thread = NULL;
1288 		st->st_xprt = NULL;
1289 		if (xprt->xp_active) {
1290 			if (!svc_request_space_available(pool) ||
1291 			    !xprt_assignthread(xprt))
1292 				TAILQ_INSERT_TAIL(&grp->sg_active,
1293 				    xprt, xp_alink);
1294 		}
1295 		mtx_unlock(&grp->sg_lock);
1296 		SVC_RELEASE(xprt);
1297 
1298 		/*
1299 		 * Execute what we have queued.
1300 		 */
1301 		mtx_lock(&st->st_lock);
1302 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1303 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1304 			mtx_unlock(&st->st_lock);
1305 			sz = (long)rqstp->rq_size;
1306 			svc_executereq(rqstp);
1307 			svc_change_space_used(pool, -sz);
1308 			mtx_lock(&st->st_lock);
1309 		}
1310 		mtx_unlock(&st->st_lock);
1311 		mtx_lock(&grp->sg_lock);
1312 	}
1313 
1314 	if (st->st_xprt) {
1315 		xprt = st->st_xprt;
1316 		st->st_xprt = NULL;
1317 		SVC_RELEASE(xprt);
1318 	}
1319 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1320 	mtx_destroy(&st->st_lock);
1321 	cv_destroy(&st->st_cond);
1322 	mem_free(st, sizeof(*st));
1323 
1324 	grp->sg_threadcount--;
1325 	if (!ismaster)
1326 		wakeup(grp);
1327 	mtx_unlock(&grp->sg_lock);
1328 }
1329 
1330 static void
1331 svc_thread_start(void *arg)
1332 {
1333 
1334 	svc_run_internal((SVCGROUP *) arg, FALSE);
1335 	kthread_exit();
1336 }
1337 
1338 static void
1339 svc_new_thread(SVCGROUP *grp)
1340 {
1341 	SVCPOOL *pool = grp->sg_pool;
1342 	struct thread *td;
1343 
1344 	mtx_lock(&grp->sg_lock);
1345 	grp->sg_threadcount++;
1346 	mtx_unlock(&grp->sg_lock);
1347 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1348 	    "%s: service", pool->sp_name);
1349 }
1350 
1351 void
1352 svc_run(SVCPOOL *pool)
1353 {
1354 	int g, i;
1355 	struct proc *p;
1356 	struct thread *td;
1357 	SVCGROUP *grp;
1358 
1359 	p = curproc;
1360 	td = curthread;
1361 	snprintf(td->td_name, sizeof(td->td_name),
1362 	    "%s: master", pool->sp_name);
1363 	pool->sp_state = SVCPOOL_ACTIVE;
1364 	pool->sp_proc = p;
1365 
1366 	/* Choose group count based on number of threads and CPUs. */
1367 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1368 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1369 	for (g = 0; g < pool->sp_groupcount; g++) {
1370 		grp = &pool->sp_groups[g];
1371 		grp->sg_minthreads = max(1,
1372 		    pool->sp_minthreads / pool->sp_groupcount);
1373 		grp->sg_maxthreads = max(1,
1374 		    pool->sp_maxthreads / pool->sp_groupcount);
1375 		grp->sg_lastcreatetime = time_uptime;
1376 	}
1377 
1378 	/* Starting threads */
1379 	pool->sp_groups[0].sg_threadcount++;
1380 	for (g = 0; g < pool->sp_groupcount; g++) {
1381 		grp = &pool->sp_groups[g];
1382 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1383 			svc_new_thread(grp);
1384 	}
1385 	svc_run_internal(&pool->sp_groups[0], TRUE);
1386 
1387 	/* Waiting for threads to stop. */
1388 	for (g = 0; g < pool->sp_groupcount; g++) {
1389 		grp = &pool->sp_groups[g];
1390 		mtx_lock(&grp->sg_lock);
1391 		while (grp->sg_threadcount > 0)
1392 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1393 		mtx_unlock(&grp->sg_lock);
1394 	}
1395 }
1396 
1397 void
1398 svc_exit(SVCPOOL *pool)
1399 {
1400 	SVCGROUP *grp;
1401 	SVCTHREAD *st;
1402 	int g;
1403 
1404 	pool->sp_state = SVCPOOL_CLOSING;
1405 	for (g = 0; g < pool->sp_groupcount; g++) {
1406 		grp = &pool->sp_groups[g];
1407 		mtx_lock(&grp->sg_lock);
1408 		if (grp->sg_state != SVCPOOL_CLOSING) {
1409 			grp->sg_state = SVCPOOL_CLOSING;
1410 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1411 				cv_signal(&st->st_cond);
1412 		}
1413 		mtx_unlock(&grp->sg_lock);
1414 	}
1415 }
1416 
1417 bool_t
1418 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1419 {
1420 	struct mbuf *m;
1421 	XDR xdrs;
1422 	bool_t stat;
1423 
1424 	m = rqstp->rq_args;
1425 	rqstp->rq_args = NULL;
1426 
1427 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1428 	stat = xargs(&xdrs, args);
1429 	XDR_DESTROY(&xdrs);
1430 
1431 	return (stat);
1432 }
1433 
1434 bool_t
1435 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1436 {
1437 	XDR xdrs;
1438 
1439 	if (rqstp->rq_addr) {
1440 		free(rqstp->rq_addr, M_SONAME);
1441 		rqstp->rq_addr = NULL;
1442 	}
1443 
1444 	xdrs.x_op = XDR_FREE;
1445 	return (xargs(&xdrs, args));
1446 }
1447 
1448 void
1449 svc_freereq(struct svc_req *rqstp)
1450 {
1451 	SVCTHREAD *st;
1452 	SVCPOOL *pool;
1453 
1454 	st = rqstp->rq_thread;
1455 	if (st) {
1456 		pool = st->st_pool;
1457 		if (pool->sp_done)
1458 			pool->sp_done(st, rqstp);
1459 	}
1460 
1461 	if (rqstp->rq_auth.svc_ah_ops)
1462 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1463 
1464 	if (rqstp->rq_xprt) {
1465 		SVC_RELEASE(rqstp->rq_xprt);
1466 	}
1467 
1468 	if (rqstp->rq_addr)
1469 		free(rqstp->rq_addr, M_SONAME);
1470 
1471 	if (rqstp->rq_args)
1472 		m_freem(rqstp->rq_args);
1473 
1474 	free(rqstp, M_RPC);
1475 }
1476