xref: /freebsd/sys/rpc/svc.c (revision 22cf89c938886d14f5796fc49f9f020c23ea8eaf)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice,
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice,
14  *   this list of conditions and the following disclaimer in the documentation
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its
17  *   contributors may be used to endorse or promote products derived
18  *   from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/jail.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/smp.h>
61 #include <sys/sx.h>
62 #include <sys/ucred.h>
63 
64 #include <rpc/rpc.h>
65 #include <rpc/rpcb_clnt.h>
66 #include <rpc/replay.h>
67 
68 #include <rpc/rpc_com.h>
69 
70 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
71 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
72 
73 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
74     char *);
75 static void svc_new_thread(SVCGROUP *grp);
76 static void xprt_unregister_locked(SVCXPRT *xprt);
77 static void svc_change_space_used(SVCPOOL *pool, long delta);
78 static bool_t svc_request_space_available(SVCPOOL *pool);
79 static void svcpool_cleanup(SVCPOOL *pool);
80 
81 /* ***************  SVCXPRT related stuff **************** */
82 
83 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
86 
87 SVCPOOL*
88 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
89 {
90 	SVCPOOL *pool;
91 	SVCGROUP *grp;
92 	int g;
93 
94 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
95 
96 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
97 	pool->sp_name = name;
98 	pool->sp_state = SVCPOOL_INIT;
99 	pool->sp_proc = NULL;
100 	TAILQ_INIT(&pool->sp_callouts);
101 	TAILQ_INIT(&pool->sp_lcallouts);
102 	pool->sp_minthreads = 1;
103 	pool->sp_maxthreads = 1;
104 	pool->sp_groupcount = 1;
105 	for (g = 0; g < SVC_MAXGROUPS; g++) {
106 		grp = &pool->sp_groups[g];
107 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
108 		grp->sg_pool = pool;
109 		grp->sg_state = SVCPOOL_ACTIVE;
110 		TAILQ_INIT(&grp->sg_xlist);
111 		TAILQ_INIT(&grp->sg_active);
112 		LIST_INIT(&grp->sg_idlethreads);
113 		grp->sg_minthreads = 1;
114 		grp->sg_maxthreads = 1;
115 	}
116 
117 	/*
118 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
119 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
120 	 * on LP64 architectures, so cast to u_long to avoid undefined
121 	 * behavior.  (ILP32 architectures cannot have nmbclusters
122 	 * large enough to overflow for other reasons.)
123 	 */
124 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
125 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
126 
127 	sysctl_ctx_init(&pool->sp_sysctl);
128 	if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
129 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
131 		    pool, 0, svcpool_minthread_sysctl, "I",
132 		    "Minimal number of threads");
133 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
134 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
135 		    pool, 0, svcpool_maxthread_sysctl, "I",
136 		    "Maximal number of threads");
137 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138 		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
139 		    pool, 0, svcpool_threads_sysctl, "I",
140 		    "Current number of threads");
141 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
143 		    "Number of thread groups");
144 
145 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146 		    "request_space_used", CTLFLAG_RD,
147 		    &pool->sp_space_used,
148 		    "Space in parsed but not handled requests.");
149 
150 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
151 		    "request_space_used_highest", CTLFLAG_RD,
152 		    &pool->sp_space_used_highest,
153 		    "Highest space used since reboot.");
154 
155 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
156 		    "request_space_high", CTLFLAG_RW,
157 		    &pool->sp_space_high,
158 		    "Maximum space in parsed but not handled requests.");
159 
160 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
161 		    "request_space_low", CTLFLAG_RW,
162 		    &pool->sp_space_low,
163 		    "Low water mark for request space.");
164 
165 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
166 		    "request_space_throttled", CTLFLAG_RD,
167 		    &pool->sp_space_throttled, 0,
168 		    "Whether nfs requests are currently throttled");
169 
170 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
171 		    "request_space_throttle_count", CTLFLAG_RD,
172 		    &pool->sp_space_throttle_count, 0,
173 		    "Count of times throttling based on request space has occurred");
174 	}
175 
176 	return pool;
177 }
178 
179 /*
180  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
181  * the pool data structures.
182  */
183 static void
184 svcpool_cleanup(SVCPOOL *pool)
185 {
186 	SVCGROUP *grp;
187 	SVCXPRT *xprt, *nxprt;
188 	struct svc_callout *s;
189 	struct svc_loss_callout *sl;
190 	struct svcxprt_list cleanup;
191 	int g;
192 
193 	TAILQ_INIT(&cleanup);
194 
195 	for (g = 0; g < SVC_MAXGROUPS; g++) {
196 		grp = &pool->sp_groups[g];
197 		mtx_lock(&grp->sg_lock);
198 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
199 			xprt_unregister_locked(xprt);
200 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
201 		}
202 		mtx_unlock(&grp->sg_lock);
203 	}
204 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
205 		if (xprt->xp_socket != NULL)
206 			soshutdown(xprt->xp_socket, SHUT_WR);
207 		SVC_RELEASE(xprt);
208 	}
209 
210 	mtx_lock(&pool->sp_lock);
211 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
212 		mtx_unlock(&pool->sp_lock);
213 		svc_unreg(pool, s->sc_prog, s->sc_vers);
214 		mtx_lock(&pool->sp_lock);
215 	}
216 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
217 		mtx_unlock(&pool->sp_lock);
218 		svc_loss_unreg(pool, sl->slc_dispatch);
219 		mtx_lock(&pool->sp_lock);
220 	}
221 	mtx_unlock(&pool->sp_lock);
222 }
223 
224 void
225 svcpool_destroy(SVCPOOL *pool)
226 {
227 	SVCGROUP *grp;
228 	int g;
229 
230 	svcpool_cleanup(pool);
231 
232 	for (g = 0; g < SVC_MAXGROUPS; g++) {
233 		grp = &pool->sp_groups[g];
234 		mtx_destroy(&grp->sg_lock);
235 	}
236 	mtx_destroy(&pool->sp_lock);
237 
238 	if (pool->sp_rcache)
239 		replay_freecache(pool->sp_rcache);
240 
241 	sysctl_ctx_free(&pool->sp_sysctl);
242 	free(pool, M_RPC);
243 }
244 
245 /*
246  * Similar to svcpool_destroy(), except that it does not destroy the actual
247  * data structures.  As such, "pool" may be used again.
248  */
249 void
250 svcpool_close(SVCPOOL *pool)
251 {
252 	SVCGROUP *grp;
253 	int g;
254 
255 	svcpool_cleanup(pool);
256 
257 	/* Now, initialize the pool's state for a fresh svc_run() call. */
258 	mtx_lock(&pool->sp_lock);
259 	pool->sp_state = SVCPOOL_INIT;
260 	mtx_unlock(&pool->sp_lock);
261 	for (g = 0; g < SVC_MAXGROUPS; g++) {
262 		grp = &pool->sp_groups[g];
263 		mtx_lock(&grp->sg_lock);
264 		grp->sg_state = SVCPOOL_ACTIVE;
265 		mtx_unlock(&grp->sg_lock);
266 	}
267 }
268 
269 /*
270  * Sysctl handler to get the present thread count on a pool
271  */
272 static int
273 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
274 {
275 	SVCPOOL *pool;
276 	int threads, error, g;
277 
278 	pool = oidp->oid_arg1;
279 	threads = 0;
280 	mtx_lock(&pool->sp_lock);
281 	for (g = 0; g < pool->sp_groupcount; g++)
282 		threads += pool->sp_groups[g].sg_threadcount;
283 	mtx_unlock(&pool->sp_lock);
284 	error = sysctl_handle_int(oidp, &threads, 0, req);
285 	return (error);
286 }
287 
288 /*
289  * Sysctl handler to set the minimum thread count on a pool
290  */
291 static int
292 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
293 {
294 	SVCPOOL *pool;
295 	int newminthreads, error, g;
296 
297 	pool = oidp->oid_arg1;
298 	newminthreads = pool->sp_minthreads;
299 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
300 	if (error == 0 && newminthreads != pool->sp_minthreads) {
301 		if (newminthreads > pool->sp_maxthreads)
302 			return (EINVAL);
303 		mtx_lock(&pool->sp_lock);
304 		pool->sp_minthreads = newminthreads;
305 		for (g = 0; g < pool->sp_groupcount; g++) {
306 			pool->sp_groups[g].sg_minthreads = max(1,
307 			    pool->sp_minthreads / pool->sp_groupcount);
308 		}
309 		mtx_unlock(&pool->sp_lock);
310 	}
311 	return (error);
312 }
313 
314 /*
315  * Sysctl handler to set the maximum thread count on a pool
316  */
317 static int
318 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
319 {
320 	SVCPOOL *pool;
321 	int newmaxthreads, error, g;
322 
323 	pool = oidp->oid_arg1;
324 	newmaxthreads = pool->sp_maxthreads;
325 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
326 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
327 		if (newmaxthreads < pool->sp_minthreads)
328 			return (EINVAL);
329 		mtx_lock(&pool->sp_lock);
330 		pool->sp_maxthreads = newmaxthreads;
331 		for (g = 0; g < pool->sp_groupcount; g++) {
332 			pool->sp_groups[g].sg_maxthreads = max(1,
333 			    pool->sp_maxthreads / pool->sp_groupcount);
334 		}
335 		mtx_unlock(&pool->sp_lock);
336 	}
337 	return (error);
338 }
339 
340 /*
341  * Activate a transport handle.
342  */
343 void
344 xprt_register(SVCXPRT *xprt)
345 {
346 	SVCPOOL *pool = xprt->xp_pool;
347 	SVCGROUP *grp;
348 	int g;
349 
350 	SVC_ACQUIRE(xprt);
351 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
352 	xprt->xp_group = grp = &pool->sp_groups[g];
353 	mtx_lock(&grp->sg_lock);
354 	xprt->xp_registered = TRUE;
355 	xprt->xp_active = FALSE;
356 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
357 	mtx_unlock(&grp->sg_lock);
358 }
359 
360 /*
361  * De-activate a transport handle. Note: the locked version doesn't
362  * release the transport - caller must do that after dropping the pool
363  * lock.
364  */
365 static void
366 xprt_unregister_locked(SVCXPRT *xprt)
367 {
368 	SVCGROUP *grp = xprt->xp_group;
369 
370 	mtx_assert(&grp->sg_lock, MA_OWNED);
371 	KASSERT(xprt->xp_registered == TRUE,
372 	    ("xprt_unregister_locked: not registered"));
373 	xprt_inactive_locked(xprt);
374 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
375 	xprt->xp_registered = FALSE;
376 }
377 
378 void
379 xprt_unregister(SVCXPRT *xprt)
380 {
381 	SVCGROUP *grp = xprt->xp_group;
382 
383 	mtx_lock(&grp->sg_lock);
384 	if (xprt->xp_registered == FALSE) {
385 		/* Already unregistered by another thread */
386 		mtx_unlock(&grp->sg_lock);
387 		return;
388 	}
389 	xprt_unregister_locked(xprt);
390 	mtx_unlock(&grp->sg_lock);
391 
392 	if (xprt->xp_socket != NULL)
393 		soshutdown(xprt->xp_socket, SHUT_WR);
394 	SVC_RELEASE(xprt);
395 }
396 
397 /*
398  * Attempt to assign a service thread to this transport.
399  */
400 static int
401 xprt_assignthread(SVCXPRT *xprt)
402 {
403 	SVCGROUP *grp = xprt->xp_group;
404 	SVCTHREAD *st;
405 
406 	mtx_assert(&grp->sg_lock, MA_OWNED);
407 	st = LIST_FIRST(&grp->sg_idlethreads);
408 	if (st) {
409 		LIST_REMOVE(st, st_ilink);
410 		SVC_ACQUIRE(xprt);
411 		xprt->xp_thread = st;
412 		st->st_xprt = xprt;
413 		cv_signal(&st->st_cond);
414 		return (TRUE);
415 	} else {
416 		/*
417 		 * See if we can create a new thread. The
418 		 * actual thread creation happens in
419 		 * svc_run_internal because our locking state
420 		 * is poorly defined (we are typically called
421 		 * from a socket upcall). Don't create more
422 		 * than one thread per second.
423 		 */
424 		if (grp->sg_state == SVCPOOL_ACTIVE
425 		    && grp->sg_lastcreatetime < time_uptime
426 		    && grp->sg_threadcount < grp->sg_maxthreads) {
427 			grp->sg_state = SVCPOOL_THREADWANTED;
428 		}
429 	}
430 	return (FALSE);
431 }
432 
433 void
434 xprt_active(SVCXPRT *xprt)
435 {
436 	SVCGROUP *grp = xprt->xp_group;
437 
438 	mtx_lock(&grp->sg_lock);
439 
440 	if (!xprt->xp_registered) {
441 		/*
442 		 * Race with xprt_unregister - we lose.
443 		 */
444 		mtx_unlock(&grp->sg_lock);
445 		return;
446 	}
447 
448 	if (!xprt->xp_active) {
449 		xprt->xp_active = TRUE;
450 		if (xprt->xp_thread == NULL) {
451 			if (!svc_request_space_available(xprt->xp_pool) ||
452 			    !xprt_assignthread(xprt))
453 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
454 				    xp_alink);
455 		}
456 	}
457 
458 	mtx_unlock(&grp->sg_lock);
459 }
460 
461 void
462 xprt_inactive_locked(SVCXPRT *xprt)
463 {
464 	SVCGROUP *grp = xprt->xp_group;
465 
466 	mtx_assert(&grp->sg_lock, MA_OWNED);
467 	if (xprt->xp_active) {
468 		if (xprt->xp_thread == NULL)
469 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
470 		xprt->xp_active = FALSE;
471 	}
472 }
473 
474 void
475 xprt_inactive(SVCXPRT *xprt)
476 {
477 	SVCGROUP *grp = xprt->xp_group;
478 
479 	mtx_lock(&grp->sg_lock);
480 	xprt_inactive_locked(xprt);
481 	mtx_unlock(&grp->sg_lock);
482 }
483 
484 /*
485  * Variant of xprt_inactive() for use only when sure that port is
486  * assigned to thread. For example, within receive handlers.
487  */
488 void
489 xprt_inactive_self(SVCXPRT *xprt)
490 {
491 
492 	KASSERT(xprt->xp_thread != NULL,
493 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
494 	xprt->xp_active = FALSE;
495 }
496 
497 /*
498  * Add a service program to the callout list.
499  * The dispatch routine will be called when a rpc request for this
500  * program number comes in.
501  */
502 bool_t
503 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
504     void (*dispatch)(struct svc_req *, SVCXPRT *),
505     const struct netconfig *nconf)
506 {
507 	SVCPOOL *pool = xprt->xp_pool;
508 	struct svc_callout *s;
509 	char *netid = NULL;
510 	int flag = 0;
511 
512 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
513 
514 	if (xprt->xp_netid) {
515 		netid = strdup(xprt->xp_netid, M_RPC);
516 		flag = 1;
517 	} else if (nconf && nconf->nc_netid) {
518 		netid = strdup(nconf->nc_netid, M_RPC);
519 		flag = 1;
520 	} /* must have been created with svc_raw_create */
521 	if ((netid == NULL) && (flag == 1)) {
522 		return (FALSE);
523 	}
524 
525 	mtx_lock(&pool->sp_lock);
526 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
527 		if (netid)
528 			free(netid, M_RPC);
529 		if (s->sc_dispatch == dispatch)
530 			goto rpcb_it; /* he is registering another xptr */
531 		mtx_unlock(&pool->sp_lock);
532 		return (FALSE);
533 	}
534 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
535 	if (s == NULL) {
536 		if (netid)
537 			free(netid, M_RPC);
538 		mtx_unlock(&pool->sp_lock);
539 		return (FALSE);
540 	}
541 
542 	s->sc_prog = prog;
543 	s->sc_vers = vers;
544 	s->sc_dispatch = dispatch;
545 	s->sc_netid = netid;
546 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
547 
548 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
549 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
550 
551 rpcb_it:
552 	mtx_unlock(&pool->sp_lock);
553 	/* now register the information with the local binder service */
554 	if (nconf) {
555 		bool_t dummy;
556 		struct netconfig tnc;
557 		struct netbuf nb;
558 		tnc = *nconf;
559 		nb.buf = &xprt->xp_ltaddr;
560 		nb.len = xprt->xp_ltaddr.ss_len;
561 		dummy = rpcb_set(prog, vers, &tnc, &nb);
562 		return (dummy);
563 	}
564 	return (TRUE);
565 }
566 
567 /*
568  * Remove a service program from the callout list.
569  */
570 void
571 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
572 {
573 	struct svc_callout *s;
574 
575 	/* unregister the information anyway */
576 	(void) rpcb_unset(prog, vers, NULL);
577 	mtx_lock(&pool->sp_lock);
578 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
579 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
580 		if (s->sc_netid)
581 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
582 		mem_free(s, sizeof (struct svc_callout));
583 	}
584 	mtx_unlock(&pool->sp_lock);
585 }
586 
587 /*
588  * Add a service connection loss program to the callout list.
589  * The dispatch routine will be called when some port in ths pool die.
590  */
591 bool_t
592 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
593 {
594 	SVCPOOL *pool = xprt->xp_pool;
595 	struct svc_loss_callout *s;
596 
597 	mtx_lock(&pool->sp_lock);
598 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
599 		if (s->slc_dispatch == dispatch)
600 			break;
601 	}
602 	if (s != NULL) {
603 		mtx_unlock(&pool->sp_lock);
604 		return (TRUE);
605 	}
606 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
607 	if (s == NULL) {
608 		mtx_unlock(&pool->sp_lock);
609 		return (FALSE);
610 	}
611 	s->slc_dispatch = dispatch;
612 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
613 	mtx_unlock(&pool->sp_lock);
614 	return (TRUE);
615 }
616 
617 /*
618  * Remove a service connection loss program from the callout list.
619  */
620 void
621 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
622 {
623 	struct svc_loss_callout *s;
624 
625 	mtx_lock(&pool->sp_lock);
626 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
627 		if (s->slc_dispatch == dispatch) {
628 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
629 			free(s, M_RPC);
630 			break;
631 		}
632 	}
633 	mtx_unlock(&pool->sp_lock);
634 }
635 
636 /* ********************** CALLOUT list related stuff ************* */
637 
638 /*
639  * Search the callout list for a program number, return the callout
640  * struct.
641  */
642 static struct svc_callout *
643 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
644 {
645 	struct svc_callout *s;
646 
647 	mtx_assert(&pool->sp_lock, MA_OWNED);
648 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
649 		if (s->sc_prog == prog && s->sc_vers == vers
650 		    && (netid == NULL || s->sc_netid == NULL ||
651 			strcmp(netid, s->sc_netid) == 0))
652 			break;
653 	}
654 
655 	return (s);
656 }
657 
658 /* ******************* REPLY GENERATION ROUTINES  ************ */
659 
660 static bool_t
661 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
662     struct mbuf *body)
663 {
664 	SVCXPRT *xprt = rqstp->rq_xprt;
665 	bool_t ok;
666 
667 	if (rqstp->rq_args) {
668 		m_freem(rqstp->rq_args);
669 		rqstp->rq_args = NULL;
670 	}
671 
672 	if (xprt->xp_pool->sp_rcache)
673 		replay_setreply(xprt->xp_pool->sp_rcache,
674 		    rply, svc_getrpccaller(rqstp), body);
675 
676 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
677 		return (FALSE);
678 
679 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
680 	if (rqstp->rq_addr) {
681 		free(rqstp->rq_addr, M_SONAME);
682 		rqstp->rq_addr = NULL;
683 	}
684 
685 	return (ok);
686 }
687 
688 /*
689  * Send a reply to an rpc request
690  */
691 bool_t
692 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
693 {
694 	struct rpc_msg rply;
695 	struct mbuf *m;
696 	XDR xdrs;
697 	bool_t ok;
698 
699 	rply.rm_xid = rqstp->rq_xid;
700 	rply.rm_direction = REPLY;
701 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
702 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
703 	rply.acpted_rply.ar_stat = SUCCESS;
704 	rply.acpted_rply.ar_results.where = NULL;
705 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
706 
707 	m = m_getcl(M_WAITOK, MT_DATA, 0);
708 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
709 	ok = xdr_results(&xdrs, xdr_location);
710 	XDR_DESTROY(&xdrs);
711 
712 	if (ok) {
713 		return (svc_sendreply_common(rqstp, &rply, m));
714 	} else {
715 		m_freem(m);
716 		return (FALSE);
717 	}
718 }
719 
720 bool_t
721 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
722 {
723 	struct rpc_msg rply;
724 
725 	rply.rm_xid = rqstp->rq_xid;
726 	rply.rm_direction = REPLY;
727 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
728 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
729 	rply.acpted_rply.ar_stat = SUCCESS;
730 	rply.acpted_rply.ar_results.where = NULL;
731 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
732 
733 	return (svc_sendreply_common(rqstp, &rply, m));
734 }
735 
736 /*
737  * No procedure error reply
738  */
739 void
740 svcerr_noproc(struct svc_req *rqstp)
741 {
742 	SVCXPRT *xprt = rqstp->rq_xprt;
743 	struct rpc_msg rply;
744 
745 	rply.rm_xid = rqstp->rq_xid;
746 	rply.rm_direction = REPLY;
747 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
748 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
749 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
750 
751 	if (xprt->xp_pool->sp_rcache)
752 		replay_setreply(xprt->xp_pool->sp_rcache,
753 		    &rply, svc_getrpccaller(rqstp), NULL);
754 
755 	svc_sendreply_common(rqstp, &rply, NULL);
756 }
757 
758 /*
759  * Can't decode args error reply
760  */
761 void
762 svcerr_decode(struct svc_req *rqstp)
763 {
764 	SVCXPRT *xprt = rqstp->rq_xprt;
765 	struct rpc_msg rply;
766 
767 	rply.rm_xid = rqstp->rq_xid;
768 	rply.rm_direction = REPLY;
769 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
770 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
771 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
772 
773 	if (xprt->xp_pool->sp_rcache)
774 		replay_setreply(xprt->xp_pool->sp_rcache,
775 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
776 
777 	svc_sendreply_common(rqstp, &rply, NULL);
778 }
779 
780 /*
781  * Some system error
782  */
783 void
784 svcerr_systemerr(struct svc_req *rqstp)
785 {
786 	SVCXPRT *xprt = rqstp->rq_xprt;
787 	struct rpc_msg rply;
788 
789 	rply.rm_xid = rqstp->rq_xid;
790 	rply.rm_direction = REPLY;
791 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
792 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
793 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
794 
795 	if (xprt->xp_pool->sp_rcache)
796 		replay_setreply(xprt->xp_pool->sp_rcache,
797 		    &rply, svc_getrpccaller(rqstp), NULL);
798 
799 	svc_sendreply_common(rqstp, &rply, NULL);
800 }
801 
802 /*
803  * Authentication error reply
804  */
805 void
806 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
807 {
808 	SVCXPRT *xprt = rqstp->rq_xprt;
809 	struct rpc_msg rply;
810 
811 	rply.rm_xid = rqstp->rq_xid;
812 	rply.rm_direction = REPLY;
813 	rply.rm_reply.rp_stat = MSG_DENIED;
814 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
815 	rply.rjcted_rply.rj_why = why;
816 
817 	if (xprt->xp_pool->sp_rcache)
818 		replay_setreply(xprt->xp_pool->sp_rcache,
819 		    &rply, svc_getrpccaller(rqstp), NULL);
820 
821 	svc_sendreply_common(rqstp, &rply, NULL);
822 }
823 
824 /*
825  * Auth too weak error reply
826  */
827 void
828 svcerr_weakauth(struct svc_req *rqstp)
829 {
830 
831 	svcerr_auth(rqstp, AUTH_TOOWEAK);
832 }
833 
834 /*
835  * Program unavailable error reply
836  */
837 void
838 svcerr_noprog(struct svc_req *rqstp)
839 {
840 	SVCXPRT *xprt = rqstp->rq_xprt;
841 	struct rpc_msg rply;
842 
843 	rply.rm_xid = rqstp->rq_xid;
844 	rply.rm_direction = REPLY;
845 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
846 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
847 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
848 
849 	if (xprt->xp_pool->sp_rcache)
850 		replay_setreply(xprt->xp_pool->sp_rcache,
851 		    &rply, svc_getrpccaller(rqstp), NULL);
852 
853 	svc_sendreply_common(rqstp, &rply, NULL);
854 }
855 
856 /*
857  * Program version mismatch error reply
858  */
859 void
860 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
861 {
862 	SVCXPRT *xprt = rqstp->rq_xprt;
863 	struct rpc_msg rply;
864 
865 	rply.rm_xid = rqstp->rq_xid;
866 	rply.rm_direction = REPLY;
867 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
868 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
869 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
870 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
871 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
872 
873 	if (xprt->xp_pool->sp_rcache)
874 		replay_setreply(xprt->xp_pool->sp_rcache,
875 		    &rply, svc_getrpccaller(rqstp), NULL);
876 
877 	svc_sendreply_common(rqstp, &rply, NULL);
878 }
879 
880 /*
881  * Allocate a new server transport structure. All fields are
882  * initialized to zero and xp_p3 is initialized to point at an
883  * extension structure to hold various flags and authentication
884  * parameters.
885  */
886 SVCXPRT *
887 svc_xprt_alloc(void)
888 {
889 	SVCXPRT *xprt;
890 	SVCXPRT_EXT *ext;
891 
892 	xprt = mem_alloc(sizeof(SVCXPRT));
893 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
894 	xprt->xp_p3 = ext;
895 	refcount_init(&xprt->xp_refs, 1);
896 
897 	return (xprt);
898 }
899 
900 /*
901  * Free a server transport structure.
902  */
903 void
904 svc_xprt_free(SVCXPRT *xprt)
905 {
906 
907 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
908 	/* The size argument is ignored, so 0 is ok. */
909 	mem_free(xprt->xp_gidp, 0);
910 	mem_free(xprt, sizeof(SVCXPRT));
911 }
912 
913 /* ******************* SERVER INPUT STUFF ******************* */
914 
915 /*
916  * Read RPC requests from a transport and queue them to be
917  * executed. We handle authentication and replay cache replies here.
918  * Actually dispatching the RPC is deferred till svc_executereq.
919  */
920 static enum xprt_stat
921 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
922 {
923 	SVCPOOL *pool = xprt->xp_pool;
924 	struct svc_req *r;
925 	struct rpc_msg msg;
926 	struct mbuf *args;
927 	struct svc_loss_callout *s;
928 	enum xprt_stat stat;
929 
930 	/* now receive msgs from xprtprt (support batch calls) */
931 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
932 
933 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
934 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
935 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
936 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
937 		enum auth_stat why;
938 
939 		/*
940 		 * Handle replays and authenticate before queuing the
941 		 * request to be executed.
942 		 */
943 		SVC_ACQUIRE(xprt);
944 		r->rq_xprt = xprt;
945 		if (pool->sp_rcache) {
946 			struct rpc_msg repmsg;
947 			struct mbuf *repbody;
948 			enum replay_state rs;
949 			rs = replay_find(pool->sp_rcache, &msg,
950 			    svc_getrpccaller(r), &repmsg, &repbody);
951 			switch (rs) {
952 			case RS_NEW:
953 				break;
954 			case RS_DONE:
955 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
956 				    repbody, &r->rq_reply_seq);
957 				if (r->rq_addr) {
958 					free(r->rq_addr, M_SONAME);
959 					r->rq_addr = NULL;
960 				}
961 				m_freem(args);
962 				goto call_done;
963 
964 			default:
965 				m_freem(args);
966 				goto call_done;
967 			}
968 		}
969 
970 		r->rq_xid = msg.rm_xid;
971 		r->rq_prog = msg.rm_call.cb_prog;
972 		r->rq_vers = msg.rm_call.cb_vers;
973 		r->rq_proc = msg.rm_call.cb_proc;
974 		r->rq_size = sizeof(*r) + m_length(args, NULL);
975 		r->rq_args = args;
976 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
977 			/*
978 			 * RPCSEC_GSS uses this return code
979 			 * for requests that form part of its
980 			 * context establishment protocol and
981 			 * should not be dispatched to the
982 			 * application.
983 			 */
984 			if (why != RPCSEC_GSS_NODISPATCH)
985 				svcerr_auth(r, why);
986 			goto call_done;
987 		}
988 
989 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
990 			svcerr_decode(r);
991 			goto call_done;
992 		}
993 
994 		/*
995 		 * Everything checks out, return request to caller.
996 		 */
997 		*rqstp_ret = r;
998 		r = NULL;
999 	}
1000 call_done:
1001 	if (r) {
1002 		svc_freereq(r);
1003 		r = NULL;
1004 	}
1005 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1006 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1007 			(*s->slc_dispatch)(xprt);
1008 		xprt_unregister(xprt);
1009 	}
1010 
1011 	return (stat);
1012 }
1013 
1014 static void
1015 svc_executereq(struct svc_req *rqstp)
1016 {
1017 	SVCXPRT *xprt = rqstp->rq_xprt;
1018 	SVCPOOL *pool = xprt->xp_pool;
1019 	int prog_found;
1020 	rpcvers_t low_vers;
1021 	rpcvers_t high_vers;
1022 	struct svc_callout *s;
1023 
1024 	/* now match message with a registered service*/
1025 	prog_found = FALSE;
1026 	low_vers = (rpcvers_t) -1L;
1027 	high_vers = (rpcvers_t) 0L;
1028 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1029 		if (s->sc_prog == rqstp->rq_prog) {
1030 			if (s->sc_vers == rqstp->rq_vers) {
1031 				/*
1032 				 * We hand ownership of r to the
1033 				 * dispatch method - they must call
1034 				 * svc_freereq.
1035 				 */
1036 				(*s->sc_dispatch)(rqstp, xprt);
1037 				return;
1038 			}  /* found correct version */
1039 			prog_found = TRUE;
1040 			if (s->sc_vers < low_vers)
1041 				low_vers = s->sc_vers;
1042 			if (s->sc_vers > high_vers)
1043 				high_vers = s->sc_vers;
1044 		}   /* found correct program */
1045 	}
1046 
1047 	/*
1048 	 * if we got here, the program or version
1049 	 * is not served ...
1050 	 */
1051 	if (prog_found)
1052 		svcerr_progvers(rqstp, low_vers, high_vers);
1053 	else
1054 		svcerr_noprog(rqstp);
1055 
1056 	svc_freereq(rqstp);
1057 }
1058 
1059 static void
1060 svc_checkidle(SVCGROUP *grp)
1061 {
1062 	SVCXPRT *xprt, *nxprt;
1063 	time_t timo;
1064 	struct svcxprt_list cleanup;
1065 
1066 	TAILQ_INIT(&cleanup);
1067 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1068 		/*
1069 		 * Only some transports have idle timers. Don't time
1070 		 * something out which is just waking up.
1071 		 */
1072 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1073 			continue;
1074 
1075 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1076 		if (time_uptime > timo) {
1077 			xprt_unregister_locked(xprt);
1078 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1079 		}
1080 	}
1081 
1082 	mtx_unlock(&grp->sg_lock);
1083 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1084 		soshutdown(xprt->xp_socket, SHUT_WR);
1085 		SVC_RELEASE(xprt);
1086 	}
1087 	mtx_lock(&grp->sg_lock);
1088 }
1089 
1090 static void
1091 svc_assign_waiting_sockets(SVCPOOL *pool)
1092 {
1093 	SVCGROUP *grp;
1094 	SVCXPRT *xprt;
1095 	int g;
1096 
1097 	for (g = 0; g < pool->sp_groupcount; g++) {
1098 		grp = &pool->sp_groups[g];
1099 		mtx_lock(&grp->sg_lock);
1100 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1101 			if (xprt_assignthread(xprt))
1102 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1103 			else
1104 				break;
1105 		}
1106 		mtx_unlock(&grp->sg_lock);
1107 	}
1108 }
1109 
1110 static void
1111 svc_change_space_used(SVCPOOL *pool, long delta)
1112 {
1113 	unsigned long value;
1114 
1115 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1116 	if (delta > 0) {
1117 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1118 			pool->sp_space_throttled = TRUE;
1119 			pool->sp_space_throttle_count++;
1120 		}
1121 		if (value > pool->sp_space_used_highest)
1122 			pool->sp_space_used_highest = value;
1123 	} else {
1124 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1125 			pool->sp_space_throttled = FALSE;
1126 			svc_assign_waiting_sockets(pool);
1127 		}
1128 	}
1129 }
1130 
1131 static bool_t
1132 svc_request_space_available(SVCPOOL *pool)
1133 {
1134 
1135 	if (pool->sp_space_throttled)
1136 		return (FALSE);
1137 	return (TRUE);
1138 }
1139 
1140 static void
1141 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1142 {
1143 	SVCPOOL *pool = grp->sg_pool;
1144 	SVCTHREAD *st, *stpref;
1145 	SVCXPRT *xprt;
1146 	enum xprt_stat stat;
1147 	struct svc_req *rqstp;
1148 	struct proc *p;
1149 	long sz;
1150 	int error;
1151 
1152 	st = mem_alloc(sizeof(*st));
1153 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1154 	st->st_pool = pool;
1155 	st->st_xprt = NULL;
1156 	STAILQ_INIT(&st->st_reqs);
1157 	cv_init(&st->st_cond, "rpcsvc");
1158 
1159 	mtx_lock(&grp->sg_lock);
1160 
1161 	/*
1162 	 * If we are a new thread which was spawned to cope with
1163 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1164 	 */
1165 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1166 		grp->sg_state = SVCPOOL_ACTIVE;
1167 
1168 	while (grp->sg_state != SVCPOOL_CLOSING) {
1169 		/*
1170 		 * Create new thread if requested.
1171 		 */
1172 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1173 			grp->sg_state = SVCPOOL_THREADSTARTING;
1174 			grp->sg_lastcreatetime = time_uptime;
1175 			mtx_unlock(&grp->sg_lock);
1176 			svc_new_thread(grp);
1177 			mtx_lock(&grp->sg_lock);
1178 			continue;
1179 		}
1180 
1181 		/*
1182 		 * Check for idle transports once per second.
1183 		 */
1184 		if (time_uptime > grp->sg_lastidlecheck) {
1185 			grp->sg_lastidlecheck = time_uptime;
1186 			svc_checkidle(grp);
1187 		}
1188 
1189 		xprt = st->st_xprt;
1190 		if (!xprt) {
1191 			/*
1192 			 * Enforce maxthreads count.
1193 			 */
1194 			if (!ismaster && grp->sg_threadcount >
1195 			    grp->sg_maxthreads)
1196 				break;
1197 
1198 			/*
1199 			 * Before sleeping, see if we can find an
1200 			 * active transport which isn't being serviced
1201 			 * by a thread.
1202 			 */
1203 			if (svc_request_space_available(pool) &&
1204 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1205 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1206 				SVC_ACQUIRE(xprt);
1207 				xprt->xp_thread = st;
1208 				st->st_xprt = xprt;
1209 				continue;
1210 			}
1211 
1212 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1213 			if (ismaster || (!ismaster &&
1214 			    grp->sg_threadcount > grp->sg_minthreads))
1215 				error = cv_timedwait_sig(&st->st_cond,
1216 				    &grp->sg_lock, 5 * hz);
1217 			else
1218 				error = cv_wait_sig(&st->st_cond,
1219 				    &grp->sg_lock);
1220 			if (st->st_xprt == NULL)
1221 				LIST_REMOVE(st, st_ilink);
1222 
1223 			/*
1224 			 * Reduce worker thread count when idle.
1225 			 */
1226 			if (error == EWOULDBLOCK) {
1227 				if (!ismaster
1228 				    && (grp->sg_threadcount
1229 					> grp->sg_minthreads)
1230 					&& !st->st_xprt)
1231 					break;
1232 			} else if (error != 0) {
1233 				KASSERT(error == EINTR || error == ERESTART,
1234 				    ("non-signal error %d", error));
1235 				mtx_unlock(&grp->sg_lock);
1236 				p = curproc;
1237 				PROC_LOCK(p);
1238 				if (P_SHOULDSTOP(p) ||
1239 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1240 					thread_suspend_check(0);
1241 					PROC_UNLOCK(p);
1242 					mtx_lock(&grp->sg_lock);
1243 				} else {
1244 					PROC_UNLOCK(p);
1245 					svc_exit(pool);
1246 					mtx_lock(&grp->sg_lock);
1247 					break;
1248 				}
1249 			}
1250 			continue;
1251 		}
1252 		mtx_unlock(&grp->sg_lock);
1253 
1254 		/*
1255 		 * Drain the transport socket and queue up any RPCs.
1256 		 */
1257 		xprt->xp_lastactive = time_uptime;
1258 		do {
1259 			if (!svc_request_space_available(pool))
1260 				break;
1261 			rqstp = NULL;
1262 			stat = svc_getreq(xprt, &rqstp);
1263 			if (rqstp) {
1264 				svc_change_space_used(pool, rqstp->rq_size);
1265 				/*
1266 				 * See if the application has a preference
1267 				 * for some other thread.
1268 				 */
1269 				if (pool->sp_assign) {
1270 					stpref = pool->sp_assign(st, rqstp);
1271 					rqstp->rq_thread = stpref;
1272 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1273 					    rqstp, rq_link);
1274 					mtx_unlock(&stpref->st_lock);
1275 					if (stpref != st)
1276 						rqstp = NULL;
1277 				} else {
1278 					rqstp->rq_thread = st;
1279 					STAILQ_INSERT_TAIL(&st->st_reqs,
1280 					    rqstp, rq_link);
1281 				}
1282 			}
1283 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1284 		    && grp->sg_state != SVCPOOL_CLOSING);
1285 
1286 		/*
1287 		 * Move this transport to the end of the active list to
1288 		 * ensure fairness when multiple transports are active.
1289 		 * If this was the last queued request, svc_getreq will end
1290 		 * up calling xprt_inactive to remove from the active list.
1291 		 */
1292 		mtx_lock(&grp->sg_lock);
1293 		xprt->xp_thread = NULL;
1294 		st->st_xprt = NULL;
1295 		if (xprt->xp_active) {
1296 			if (!svc_request_space_available(pool) ||
1297 			    !xprt_assignthread(xprt))
1298 				TAILQ_INSERT_TAIL(&grp->sg_active,
1299 				    xprt, xp_alink);
1300 		}
1301 		mtx_unlock(&grp->sg_lock);
1302 		SVC_RELEASE(xprt);
1303 
1304 		/*
1305 		 * Execute what we have queued.
1306 		 */
1307 		mtx_lock(&st->st_lock);
1308 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1309 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1310 			mtx_unlock(&st->st_lock);
1311 			sz = (long)rqstp->rq_size;
1312 			svc_executereq(rqstp);
1313 			svc_change_space_used(pool, -sz);
1314 			mtx_lock(&st->st_lock);
1315 		}
1316 		mtx_unlock(&st->st_lock);
1317 		mtx_lock(&grp->sg_lock);
1318 	}
1319 
1320 	if (st->st_xprt) {
1321 		xprt = st->st_xprt;
1322 		st->st_xprt = NULL;
1323 		SVC_RELEASE(xprt);
1324 	}
1325 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1326 	mtx_destroy(&st->st_lock);
1327 	cv_destroy(&st->st_cond);
1328 	mem_free(st, sizeof(*st));
1329 
1330 	grp->sg_threadcount--;
1331 	if (!ismaster)
1332 		wakeup(grp);
1333 	mtx_unlock(&grp->sg_lock);
1334 }
1335 
1336 static void
1337 svc_thread_start(void *arg)
1338 {
1339 
1340 	svc_run_internal((SVCGROUP *) arg, FALSE);
1341 	kthread_exit();
1342 }
1343 
1344 static void
1345 svc_new_thread(SVCGROUP *grp)
1346 {
1347 	SVCPOOL *pool = grp->sg_pool;
1348 	struct thread *td;
1349 
1350 	mtx_lock(&grp->sg_lock);
1351 	grp->sg_threadcount++;
1352 	mtx_unlock(&grp->sg_lock);
1353 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1354 	    "%s: service", pool->sp_name);
1355 }
1356 
1357 void
1358 svc_run(SVCPOOL *pool)
1359 {
1360 	int g, i;
1361 	struct proc *p;
1362 	struct thread *td;
1363 	SVCGROUP *grp;
1364 
1365 	p = curproc;
1366 	td = curthread;
1367 	snprintf(td->td_name, sizeof(td->td_name),
1368 	    "%s: master", pool->sp_name);
1369 	pool->sp_state = SVCPOOL_ACTIVE;
1370 	pool->sp_proc = p;
1371 
1372 	/* Choose group count based on number of threads and CPUs. */
1373 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1374 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1375 	for (g = 0; g < pool->sp_groupcount; g++) {
1376 		grp = &pool->sp_groups[g];
1377 		grp->sg_minthreads = max(1,
1378 		    pool->sp_minthreads / pool->sp_groupcount);
1379 		grp->sg_maxthreads = max(1,
1380 		    pool->sp_maxthreads / pool->sp_groupcount);
1381 		grp->sg_lastcreatetime = time_uptime;
1382 	}
1383 
1384 	/* Starting threads */
1385 	pool->sp_groups[0].sg_threadcount++;
1386 	for (g = 0; g < pool->sp_groupcount; g++) {
1387 		grp = &pool->sp_groups[g];
1388 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1389 			svc_new_thread(grp);
1390 	}
1391 	svc_run_internal(&pool->sp_groups[0], TRUE);
1392 
1393 	/* Waiting for threads to stop. */
1394 	for (g = 0; g < pool->sp_groupcount; g++) {
1395 		grp = &pool->sp_groups[g];
1396 		mtx_lock(&grp->sg_lock);
1397 		while (grp->sg_threadcount > 0)
1398 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1399 		mtx_unlock(&grp->sg_lock);
1400 	}
1401 }
1402 
1403 void
1404 svc_exit(SVCPOOL *pool)
1405 {
1406 	SVCGROUP *grp;
1407 	SVCTHREAD *st;
1408 	int g;
1409 
1410 	pool->sp_state = SVCPOOL_CLOSING;
1411 	for (g = 0; g < pool->sp_groupcount; g++) {
1412 		grp = &pool->sp_groups[g];
1413 		mtx_lock(&grp->sg_lock);
1414 		if (grp->sg_state != SVCPOOL_CLOSING) {
1415 			grp->sg_state = SVCPOOL_CLOSING;
1416 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1417 				cv_signal(&st->st_cond);
1418 		}
1419 		mtx_unlock(&grp->sg_lock);
1420 	}
1421 }
1422 
1423 bool_t
1424 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1425 {
1426 	struct mbuf *m;
1427 	XDR xdrs;
1428 	bool_t stat;
1429 
1430 	m = rqstp->rq_args;
1431 	rqstp->rq_args = NULL;
1432 
1433 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1434 	stat = xargs(&xdrs, args);
1435 	XDR_DESTROY(&xdrs);
1436 
1437 	return (stat);
1438 }
1439 
1440 bool_t
1441 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1442 {
1443 	XDR xdrs;
1444 
1445 	if (rqstp->rq_addr) {
1446 		free(rqstp->rq_addr, M_SONAME);
1447 		rqstp->rq_addr = NULL;
1448 	}
1449 
1450 	xdrs.x_op = XDR_FREE;
1451 	return (xargs(&xdrs, args));
1452 }
1453 
1454 void
1455 svc_freereq(struct svc_req *rqstp)
1456 {
1457 	SVCTHREAD *st;
1458 	SVCPOOL *pool;
1459 
1460 	st = rqstp->rq_thread;
1461 	if (st) {
1462 		pool = st->st_pool;
1463 		if (pool->sp_done)
1464 			pool->sp_done(st, rqstp);
1465 	}
1466 
1467 	if (rqstp->rq_auth.svc_ah_ops)
1468 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1469 
1470 	if (rqstp->rq_xprt) {
1471 		SVC_RELEASE(rqstp->rq_xprt);
1472 	}
1473 
1474 	if (rqstp->rq_addr)
1475 		free(rqstp->rq_addr, M_SONAME);
1476 
1477 	if (rqstp->rq_args)
1478 		m_freem(rqstp->rq_args);
1479 
1480 	free(rqstp, M_RPC);
1481 }
1482