xref: /freebsd/sys/rpc/svc.c (revision f126890ac5386406dadf7c4cfa9566cbb56537c5)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice,
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice,
14  *   this list of conditions and the following disclaimer in the documentation
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its
17  *   contributors may be used to endorse or promote products derived
18  *   from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <sys/cdefs.h>
34 /*
35  * svc.c, Server-side remote procedure call interface.
36  *
37  * There are two sets of procedures here.  The xprt routines are
38  * for handling transport handles.  The svc routines handle the
39  * list of service routines.
40  *
41  * Copyright (C) 1984, Sun Microsystems, Inc.
42  */
43 
44 #include <sys/param.h>
45 #include <sys/jail.h>
46 #include <sys/lock.h>
47 #include <sys/kernel.h>
48 #include <sys/kthread.h>
49 #include <sys/malloc.h>
50 #include <sys/mbuf.h>
51 #include <sys/mutex.h>
52 #include <sys/proc.h>
53 #include <sys/protosw.h>
54 #include <sys/queue.h>
55 #include <sys/socketvar.h>
56 #include <sys/systm.h>
57 #include <sys/smp.h>
58 #include <sys/sx.h>
59 #include <sys/ucred.h>
60 
61 #include <netinet/tcp.h>
62 
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66 
67 #include <rpc/rpc_com.h>
68 
69 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73     char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 static void svcpool_cleanup(SVCPOOL *pool);
79 
80 /* ***************  SVCXPRT related stuff **************** */
81 
82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85 
86 SVCPOOL*
87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88 {
89 	SVCPOOL *pool;
90 	SVCGROUP *grp;
91 	int g;
92 
93 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94 
95 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96 	pool->sp_name = name;
97 	pool->sp_state = SVCPOOL_INIT;
98 	pool->sp_proc = NULL;
99 	TAILQ_INIT(&pool->sp_callouts);
100 	TAILQ_INIT(&pool->sp_lcallouts);
101 	pool->sp_minthreads = 1;
102 	pool->sp_maxthreads = 1;
103 	pool->sp_groupcount = 1;
104 	for (g = 0; g < SVC_MAXGROUPS; g++) {
105 		grp = &pool->sp_groups[g];
106 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107 		grp->sg_pool = pool;
108 		grp->sg_state = SVCPOOL_ACTIVE;
109 		TAILQ_INIT(&grp->sg_xlist);
110 		TAILQ_INIT(&grp->sg_active);
111 		LIST_INIT(&grp->sg_idlethreads);
112 		grp->sg_minthreads = 1;
113 		grp->sg_maxthreads = 1;
114 	}
115 
116 	/*
117 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
118 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119 	 * on LP64 architectures, so cast to u_long to avoid undefined
120 	 * behavior.  (ILP32 architectures cannot have nmbclusters
121 	 * large enough to overflow for other reasons.)
122 	 */
123 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125 
126 	sysctl_ctx_init(&pool->sp_sysctl);
127 	if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
128 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
130 		    pool, 0, svcpool_minthread_sysctl, "I",
131 		    "Minimal number of threads");
132 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
134 		    pool, 0, svcpool_maxthread_sysctl, "I",
135 		    "Maximal number of threads");
136 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
138 		    pool, 0, svcpool_threads_sysctl, "I",
139 		    "Current number of threads");
140 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142 		    "Number of thread groups");
143 
144 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 		    "request_space_used", CTLFLAG_RD,
146 		    &pool->sp_space_used,
147 		    "Space in parsed but not handled requests.");
148 
149 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 		    "request_space_used_highest", CTLFLAG_RD,
151 		    &pool->sp_space_used_highest,
152 		    "Highest space used since reboot.");
153 
154 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 		    "request_space_high", CTLFLAG_RW,
156 		    &pool->sp_space_high,
157 		    "Maximum space in parsed but not handled requests.");
158 
159 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 		    "request_space_low", CTLFLAG_RW,
161 		    &pool->sp_space_low,
162 		    "Low water mark for request space.");
163 
164 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 		    "request_space_throttled", CTLFLAG_RD,
166 		    &pool->sp_space_throttled, 0,
167 		    "Whether nfs requests are currently throttled");
168 
169 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 		    "request_space_throttle_count", CTLFLAG_RD,
171 		    &pool->sp_space_throttle_count, 0,
172 		    "Count of times throttling based on request space has occurred");
173 	}
174 
175 	return pool;
176 }
177 
178 /*
179  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180  * the pool data structures.
181  */
182 static void
183 svcpool_cleanup(SVCPOOL *pool)
184 {
185 	SVCGROUP *grp;
186 	SVCXPRT *xprt, *nxprt;
187 	struct svc_callout *s;
188 	struct svc_loss_callout *sl;
189 	struct svcxprt_list cleanup;
190 	int g;
191 
192 	TAILQ_INIT(&cleanup);
193 
194 	for (g = 0; g < SVC_MAXGROUPS; g++) {
195 		grp = &pool->sp_groups[g];
196 		mtx_lock(&grp->sg_lock);
197 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198 			xprt_unregister_locked(xprt);
199 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200 		}
201 		mtx_unlock(&grp->sg_lock);
202 	}
203 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204 		if (xprt->xp_socket != NULL)
205 			soshutdown(xprt->xp_socket, SHUT_WR);
206 		SVC_RELEASE(xprt);
207 	}
208 
209 	mtx_lock(&pool->sp_lock);
210 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
211 		mtx_unlock(&pool->sp_lock);
212 		svc_unreg(pool, s->sc_prog, s->sc_vers);
213 		mtx_lock(&pool->sp_lock);
214 	}
215 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
216 		mtx_unlock(&pool->sp_lock);
217 		svc_loss_unreg(pool, sl->slc_dispatch);
218 		mtx_lock(&pool->sp_lock);
219 	}
220 	mtx_unlock(&pool->sp_lock);
221 }
222 
223 void
224 svcpool_destroy(SVCPOOL *pool)
225 {
226 	SVCGROUP *grp;
227 	int g;
228 
229 	svcpool_cleanup(pool);
230 
231 	for (g = 0; g < SVC_MAXGROUPS; g++) {
232 		grp = &pool->sp_groups[g];
233 		mtx_destroy(&grp->sg_lock);
234 	}
235 	mtx_destroy(&pool->sp_lock);
236 
237 	if (pool->sp_rcache)
238 		replay_freecache(pool->sp_rcache);
239 
240 	sysctl_ctx_free(&pool->sp_sysctl);
241 	free(pool, M_RPC);
242 }
243 
244 /*
245  * Similar to svcpool_destroy(), except that it does not destroy the actual
246  * data structures.  As such, "pool" may be used again.
247  */
248 void
249 svcpool_close(SVCPOOL *pool)
250 {
251 	SVCGROUP *grp;
252 	int g;
253 
254 	svcpool_cleanup(pool);
255 
256 	/* Now, initialize the pool's state for a fresh svc_run() call. */
257 	mtx_lock(&pool->sp_lock);
258 	pool->sp_state = SVCPOOL_INIT;
259 	mtx_unlock(&pool->sp_lock);
260 	for (g = 0; g < SVC_MAXGROUPS; g++) {
261 		grp = &pool->sp_groups[g];
262 		mtx_lock(&grp->sg_lock);
263 		grp->sg_state = SVCPOOL_ACTIVE;
264 		mtx_unlock(&grp->sg_lock);
265 	}
266 }
267 
268 /*
269  * Sysctl handler to get the present thread count on a pool
270  */
271 static int
272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
273 {
274 	SVCPOOL *pool;
275 	int threads, error, g;
276 
277 	pool = oidp->oid_arg1;
278 	threads = 0;
279 	mtx_lock(&pool->sp_lock);
280 	for (g = 0; g < pool->sp_groupcount; g++)
281 		threads += pool->sp_groups[g].sg_threadcount;
282 	mtx_unlock(&pool->sp_lock);
283 	error = sysctl_handle_int(oidp, &threads, 0, req);
284 	return (error);
285 }
286 
287 /*
288  * Sysctl handler to set the minimum thread count on a pool
289  */
290 static int
291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
292 {
293 	SVCPOOL *pool;
294 	int newminthreads, error, g;
295 
296 	pool = oidp->oid_arg1;
297 	newminthreads = pool->sp_minthreads;
298 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
299 	if (error == 0 && newminthreads != pool->sp_minthreads) {
300 		if (newminthreads > pool->sp_maxthreads)
301 			return (EINVAL);
302 		mtx_lock(&pool->sp_lock);
303 		pool->sp_minthreads = newminthreads;
304 		for (g = 0; g < pool->sp_groupcount; g++) {
305 			pool->sp_groups[g].sg_minthreads = max(1,
306 			    pool->sp_minthreads / pool->sp_groupcount);
307 		}
308 		mtx_unlock(&pool->sp_lock);
309 	}
310 	return (error);
311 }
312 
313 /*
314  * Sysctl handler to set the maximum thread count on a pool
315  */
316 static int
317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
318 {
319 	SVCPOOL *pool;
320 	int newmaxthreads, error, g;
321 
322 	pool = oidp->oid_arg1;
323 	newmaxthreads = pool->sp_maxthreads;
324 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
325 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
326 		if (newmaxthreads < pool->sp_minthreads)
327 			return (EINVAL);
328 		mtx_lock(&pool->sp_lock);
329 		pool->sp_maxthreads = newmaxthreads;
330 		for (g = 0; g < pool->sp_groupcount; g++) {
331 			pool->sp_groups[g].sg_maxthreads = max(1,
332 			    pool->sp_maxthreads / pool->sp_groupcount);
333 		}
334 		mtx_unlock(&pool->sp_lock);
335 	}
336 	return (error);
337 }
338 
339 /*
340  * Activate a transport handle.
341  */
342 void
343 xprt_register(SVCXPRT *xprt)
344 {
345 	SVCPOOL *pool = xprt->xp_pool;
346 	SVCGROUP *grp;
347 	int g;
348 
349 	SVC_ACQUIRE(xprt);
350 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
351 	xprt->xp_group = grp = &pool->sp_groups[g];
352 	mtx_lock(&grp->sg_lock);
353 	xprt->xp_registered = TRUE;
354 	xprt->xp_active = FALSE;
355 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
356 	mtx_unlock(&grp->sg_lock);
357 }
358 
359 /*
360  * De-activate a transport handle. Note: the locked version doesn't
361  * release the transport - caller must do that after dropping the pool
362  * lock.
363  */
364 static void
365 xprt_unregister_locked(SVCXPRT *xprt)
366 {
367 	SVCGROUP *grp = xprt->xp_group;
368 
369 	mtx_assert(&grp->sg_lock, MA_OWNED);
370 	KASSERT(xprt->xp_registered == TRUE,
371 	    ("xprt_unregister_locked: not registered"));
372 	xprt_inactive_locked(xprt);
373 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
374 	xprt->xp_registered = FALSE;
375 }
376 
377 void
378 xprt_unregister(SVCXPRT *xprt)
379 {
380 	SVCGROUP *grp = xprt->xp_group;
381 
382 	mtx_lock(&grp->sg_lock);
383 	if (xprt->xp_registered == FALSE) {
384 		/* Already unregistered by another thread */
385 		mtx_unlock(&grp->sg_lock);
386 		return;
387 	}
388 	xprt_unregister_locked(xprt);
389 	mtx_unlock(&grp->sg_lock);
390 
391 	if (xprt->xp_socket != NULL)
392 		soshutdown(xprt->xp_socket, SHUT_WR);
393 	SVC_RELEASE(xprt);
394 }
395 
396 /*
397  * Attempt to assign a service thread to this transport.
398  */
399 static int
400 xprt_assignthread(SVCXPRT *xprt)
401 {
402 	SVCGROUP *grp = xprt->xp_group;
403 	SVCTHREAD *st;
404 
405 	mtx_assert(&grp->sg_lock, MA_OWNED);
406 	st = LIST_FIRST(&grp->sg_idlethreads);
407 	if (st) {
408 		LIST_REMOVE(st, st_ilink);
409 		SVC_ACQUIRE(xprt);
410 		xprt->xp_thread = st;
411 		st->st_xprt = xprt;
412 		cv_signal(&st->st_cond);
413 		return (TRUE);
414 	} else {
415 		/*
416 		 * See if we can create a new thread. The
417 		 * actual thread creation happens in
418 		 * svc_run_internal because our locking state
419 		 * is poorly defined (we are typically called
420 		 * from a socket upcall). Don't create more
421 		 * than one thread per second.
422 		 */
423 		if (grp->sg_state == SVCPOOL_ACTIVE
424 		    && grp->sg_lastcreatetime < time_uptime
425 		    && grp->sg_threadcount < grp->sg_maxthreads) {
426 			grp->sg_state = SVCPOOL_THREADWANTED;
427 		}
428 	}
429 	return (FALSE);
430 }
431 
432 void
433 xprt_active(SVCXPRT *xprt)
434 {
435 	SVCGROUP *grp = xprt->xp_group;
436 
437 	mtx_lock(&grp->sg_lock);
438 
439 	if (!xprt->xp_registered) {
440 		/*
441 		 * Race with xprt_unregister - we lose.
442 		 */
443 		mtx_unlock(&grp->sg_lock);
444 		return;
445 	}
446 
447 	if (!xprt->xp_active) {
448 		xprt->xp_active = TRUE;
449 		if (xprt->xp_thread == NULL) {
450 			if (!svc_request_space_available(xprt->xp_pool) ||
451 			    !xprt_assignthread(xprt))
452 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
453 				    xp_alink);
454 		}
455 	}
456 
457 	mtx_unlock(&grp->sg_lock);
458 }
459 
460 void
461 xprt_inactive_locked(SVCXPRT *xprt)
462 {
463 	SVCGROUP *grp = xprt->xp_group;
464 
465 	mtx_assert(&grp->sg_lock, MA_OWNED);
466 	if (xprt->xp_active) {
467 		if (xprt->xp_thread == NULL)
468 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
469 		xprt->xp_active = FALSE;
470 	}
471 }
472 
473 void
474 xprt_inactive(SVCXPRT *xprt)
475 {
476 	SVCGROUP *grp = xprt->xp_group;
477 
478 	mtx_lock(&grp->sg_lock);
479 	xprt_inactive_locked(xprt);
480 	mtx_unlock(&grp->sg_lock);
481 }
482 
483 /*
484  * Variant of xprt_inactive() for use only when sure that port is
485  * assigned to thread. For example, within receive handlers.
486  */
487 void
488 xprt_inactive_self(SVCXPRT *xprt)
489 {
490 
491 	KASSERT(xprt->xp_thread != NULL,
492 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
493 	xprt->xp_active = FALSE;
494 }
495 
496 /*
497  * Add a service program to the callout list.
498  * The dispatch routine will be called when a rpc request for this
499  * program number comes in.
500  */
501 bool_t
502 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
503     void (*dispatch)(struct svc_req *, SVCXPRT *),
504     const struct netconfig *nconf)
505 {
506 	SVCPOOL *pool = xprt->xp_pool;
507 	struct svc_callout *s;
508 	char *netid = NULL;
509 	int flag = 0;
510 
511 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
512 
513 	if (xprt->xp_netid) {
514 		netid = strdup(xprt->xp_netid, M_RPC);
515 		flag = 1;
516 	} else if (nconf && nconf->nc_netid) {
517 		netid = strdup(nconf->nc_netid, M_RPC);
518 		flag = 1;
519 	} /* must have been created with svc_raw_create */
520 	if ((netid == NULL) && (flag == 1)) {
521 		return (FALSE);
522 	}
523 
524 	mtx_lock(&pool->sp_lock);
525 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
526 		if (netid)
527 			free(netid, M_RPC);
528 		if (s->sc_dispatch == dispatch)
529 			goto rpcb_it; /* he is registering another xptr */
530 		mtx_unlock(&pool->sp_lock);
531 		return (FALSE);
532 	}
533 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
534 	if (s == NULL) {
535 		if (netid)
536 			free(netid, M_RPC);
537 		mtx_unlock(&pool->sp_lock);
538 		return (FALSE);
539 	}
540 
541 	s->sc_prog = prog;
542 	s->sc_vers = vers;
543 	s->sc_dispatch = dispatch;
544 	s->sc_netid = netid;
545 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
546 
547 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
548 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
549 
550 rpcb_it:
551 	mtx_unlock(&pool->sp_lock);
552 	/* now register the information with the local binder service */
553 	if (nconf) {
554 		bool_t dummy;
555 		struct netconfig tnc;
556 		struct netbuf nb;
557 		tnc = *nconf;
558 		nb.buf = &xprt->xp_ltaddr;
559 		nb.len = xprt->xp_ltaddr.ss_len;
560 		dummy = rpcb_set(prog, vers, &tnc, &nb);
561 		return (dummy);
562 	}
563 	return (TRUE);
564 }
565 
566 /*
567  * Remove a service program from the callout list.
568  */
569 void
570 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
571 {
572 	struct svc_callout *s;
573 
574 	/* unregister the information anyway */
575 	(void) rpcb_unset(prog, vers, NULL);
576 	mtx_lock(&pool->sp_lock);
577 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
578 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
579 		if (s->sc_netid)
580 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
581 		mem_free(s, sizeof (struct svc_callout));
582 	}
583 	mtx_unlock(&pool->sp_lock);
584 }
585 
586 /*
587  * Add a service connection loss program to the callout list.
588  * The dispatch routine will be called when some port in ths pool die.
589  */
590 bool_t
591 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
592 {
593 	SVCPOOL *pool = xprt->xp_pool;
594 	struct svc_loss_callout *s;
595 
596 	mtx_lock(&pool->sp_lock);
597 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
598 		if (s->slc_dispatch == dispatch)
599 			break;
600 	}
601 	if (s != NULL) {
602 		mtx_unlock(&pool->sp_lock);
603 		return (TRUE);
604 	}
605 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
606 	if (s == NULL) {
607 		mtx_unlock(&pool->sp_lock);
608 		return (FALSE);
609 	}
610 	s->slc_dispatch = dispatch;
611 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
612 	mtx_unlock(&pool->sp_lock);
613 	return (TRUE);
614 }
615 
616 /*
617  * Remove a service connection loss program from the callout list.
618  */
619 void
620 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
621 {
622 	struct svc_loss_callout *s;
623 
624 	mtx_lock(&pool->sp_lock);
625 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
626 		if (s->slc_dispatch == dispatch) {
627 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
628 			free(s, M_RPC);
629 			break;
630 		}
631 	}
632 	mtx_unlock(&pool->sp_lock);
633 }
634 
635 /* ********************** CALLOUT list related stuff ************* */
636 
637 /*
638  * Search the callout list for a program number, return the callout
639  * struct.
640  */
641 static struct svc_callout *
642 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
643 {
644 	struct svc_callout *s;
645 
646 	mtx_assert(&pool->sp_lock, MA_OWNED);
647 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
648 		if (s->sc_prog == prog && s->sc_vers == vers
649 		    && (netid == NULL || s->sc_netid == NULL ||
650 			strcmp(netid, s->sc_netid) == 0))
651 			break;
652 	}
653 
654 	return (s);
655 }
656 
657 /* ******************* REPLY GENERATION ROUTINES  ************ */
658 
659 static bool_t
660 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
661     struct mbuf *body)
662 {
663 	SVCXPRT *xprt = rqstp->rq_xprt;
664 	bool_t ok;
665 
666 	if (rqstp->rq_args) {
667 		m_freem(rqstp->rq_args);
668 		rqstp->rq_args = NULL;
669 	}
670 
671 	if (xprt->xp_pool->sp_rcache)
672 		replay_setreply(xprt->xp_pool->sp_rcache,
673 		    rply, svc_getrpccaller(rqstp), body);
674 
675 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
676 		return (FALSE);
677 
678 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
679 	if (rqstp->rq_addr) {
680 		free(rqstp->rq_addr, M_SONAME);
681 		rqstp->rq_addr = NULL;
682 	}
683 
684 	return (ok);
685 }
686 
687 /*
688  * Send a reply to an rpc request
689  */
690 bool_t
691 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
692 {
693 	struct rpc_msg rply;
694 	struct mbuf *m;
695 	XDR xdrs;
696 	bool_t ok;
697 
698 	rply.rm_xid = rqstp->rq_xid;
699 	rply.rm_direction = REPLY;
700 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
701 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
702 	rply.acpted_rply.ar_stat = SUCCESS;
703 	rply.acpted_rply.ar_results.where = NULL;
704 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
705 
706 	m = m_getcl(M_WAITOK, MT_DATA, 0);
707 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
708 	ok = xdr_results(&xdrs, xdr_location);
709 	XDR_DESTROY(&xdrs);
710 
711 	if (ok) {
712 		return (svc_sendreply_common(rqstp, &rply, m));
713 	} else {
714 		m_freem(m);
715 		return (FALSE);
716 	}
717 }
718 
719 bool_t
720 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
721 {
722 	struct rpc_msg rply;
723 
724 	rply.rm_xid = rqstp->rq_xid;
725 	rply.rm_direction = REPLY;
726 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
727 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
728 	rply.acpted_rply.ar_stat = SUCCESS;
729 	rply.acpted_rply.ar_results.where = NULL;
730 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
731 
732 	return (svc_sendreply_common(rqstp, &rply, m));
733 }
734 
735 /*
736  * No procedure error reply
737  */
738 void
739 svcerr_noproc(struct svc_req *rqstp)
740 {
741 	SVCXPRT *xprt = rqstp->rq_xprt;
742 	struct rpc_msg rply;
743 
744 	rply.rm_xid = rqstp->rq_xid;
745 	rply.rm_direction = REPLY;
746 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
747 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
748 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
749 
750 	if (xprt->xp_pool->sp_rcache)
751 		replay_setreply(xprt->xp_pool->sp_rcache,
752 		    &rply, svc_getrpccaller(rqstp), NULL);
753 
754 	svc_sendreply_common(rqstp, &rply, NULL);
755 }
756 
757 /*
758  * Can't decode args error reply
759  */
760 void
761 svcerr_decode(struct svc_req *rqstp)
762 {
763 	SVCXPRT *xprt = rqstp->rq_xprt;
764 	struct rpc_msg rply;
765 
766 	rply.rm_xid = rqstp->rq_xid;
767 	rply.rm_direction = REPLY;
768 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
769 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
770 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
771 
772 	if (xprt->xp_pool->sp_rcache)
773 		replay_setreply(xprt->xp_pool->sp_rcache,
774 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
775 
776 	svc_sendreply_common(rqstp, &rply, NULL);
777 }
778 
779 /*
780  * Some system error
781  */
782 void
783 svcerr_systemerr(struct svc_req *rqstp)
784 {
785 	SVCXPRT *xprt = rqstp->rq_xprt;
786 	struct rpc_msg rply;
787 
788 	rply.rm_xid = rqstp->rq_xid;
789 	rply.rm_direction = REPLY;
790 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
791 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
792 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
793 
794 	if (xprt->xp_pool->sp_rcache)
795 		replay_setreply(xprt->xp_pool->sp_rcache,
796 		    &rply, svc_getrpccaller(rqstp), NULL);
797 
798 	svc_sendreply_common(rqstp, &rply, NULL);
799 }
800 
801 /*
802  * Authentication error reply
803  */
804 void
805 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
806 {
807 	SVCXPRT *xprt = rqstp->rq_xprt;
808 	struct rpc_msg rply;
809 
810 	rply.rm_xid = rqstp->rq_xid;
811 	rply.rm_direction = REPLY;
812 	rply.rm_reply.rp_stat = MSG_DENIED;
813 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
814 	rply.rjcted_rply.rj_why = why;
815 
816 	if (xprt->xp_pool->sp_rcache)
817 		replay_setreply(xprt->xp_pool->sp_rcache,
818 		    &rply, svc_getrpccaller(rqstp), NULL);
819 
820 	svc_sendreply_common(rqstp, &rply, NULL);
821 }
822 
823 /*
824  * Auth too weak error reply
825  */
826 void
827 svcerr_weakauth(struct svc_req *rqstp)
828 {
829 
830 	svcerr_auth(rqstp, AUTH_TOOWEAK);
831 }
832 
833 /*
834  * Program unavailable error reply
835  */
836 void
837 svcerr_noprog(struct svc_req *rqstp)
838 {
839 	SVCXPRT *xprt = rqstp->rq_xprt;
840 	struct rpc_msg rply;
841 
842 	rply.rm_xid = rqstp->rq_xid;
843 	rply.rm_direction = REPLY;
844 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
845 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
846 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
847 
848 	if (xprt->xp_pool->sp_rcache)
849 		replay_setreply(xprt->xp_pool->sp_rcache,
850 		    &rply, svc_getrpccaller(rqstp), NULL);
851 
852 	svc_sendreply_common(rqstp, &rply, NULL);
853 }
854 
855 /*
856  * Program version mismatch error reply
857  */
858 void
859 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
860 {
861 	SVCXPRT *xprt = rqstp->rq_xprt;
862 	struct rpc_msg rply;
863 
864 	rply.rm_xid = rqstp->rq_xid;
865 	rply.rm_direction = REPLY;
866 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
867 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
868 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
869 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
870 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
871 
872 	if (xprt->xp_pool->sp_rcache)
873 		replay_setreply(xprt->xp_pool->sp_rcache,
874 		    &rply, svc_getrpccaller(rqstp), NULL);
875 
876 	svc_sendreply_common(rqstp, &rply, NULL);
877 }
878 
879 /*
880  * Allocate a new server transport structure. All fields are
881  * initialized to zero and xp_p3 is initialized to point at an
882  * extension structure to hold various flags and authentication
883  * parameters.
884  */
885 SVCXPRT *
886 svc_xprt_alloc(void)
887 {
888 	SVCXPRT *xprt;
889 	SVCXPRT_EXT *ext;
890 
891 	xprt = mem_alloc(sizeof(SVCXPRT));
892 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
893 	xprt->xp_p3 = ext;
894 	refcount_init(&xprt->xp_refs, 1);
895 
896 	return (xprt);
897 }
898 
899 /*
900  * Free a server transport structure.
901  */
902 void
903 svc_xprt_free(SVCXPRT *xprt)
904 {
905 
906 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
907 	/* The size argument is ignored, so 0 is ok. */
908 	mem_free(xprt->xp_gidp, 0);
909 	mem_free(xprt, sizeof(SVCXPRT));
910 }
911 
912 /* ******************* SERVER INPUT STUFF ******************* */
913 
914 /*
915  * Read RPC requests from a transport and queue them to be
916  * executed. We handle authentication and replay cache replies here.
917  * Actually dispatching the RPC is deferred till svc_executereq.
918  */
919 static enum xprt_stat
920 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
921 {
922 	SVCPOOL *pool = xprt->xp_pool;
923 	struct svc_req *r;
924 	struct rpc_msg msg;
925 	struct mbuf *args;
926 	struct svc_loss_callout *s;
927 	enum xprt_stat stat;
928 
929 	/* now receive msgs from xprtprt (support batch calls) */
930 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
931 
932 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
933 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
934 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
935 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936 		enum auth_stat why;
937 
938 		/*
939 		 * Handle replays and authenticate before queuing the
940 		 * request to be executed.
941 		 */
942 		SVC_ACQUIRE(xprt);
943 		r->rq_xprt = xprt;
944 		if (pool->sp_rcache) {
945 			struct rpc_msg repmsg;
946 			struct mbuf *repbody;
947 			enum replay_state rs;
948 			rs = replay_find(pool->sp_rcache, &msg,
949 			    svc_getrpccaller(r), &repmsg, &repbody);
950 			switch (rs) {
951 			case RS_NEW:
952 				break;
953 			case RS_DONE:
954 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
955 				    repbody, &r->rq_reply_seq);
956 				if (r->rq_addr) {
957 					free(r->rq_addr, M_SONAME);
958 					r->rq_addr = NULL;
959 				}
960 				m_freem(args);
961 				goto call_done;
962 
963 			default:
964 				m_freem(args);
965 				goto call_done;
966 			}
967 		}
968 
969 		r->rq_xid = msg.rm_xid;
970 		r->rq_prog = msg.rm_call.cb_prog;
971 		r->rq_vers = msg.rm_call.cb_vers;
972 		r->rq_proc = msg.rm_call.cb_proc;
973 		r->rq_size = sizeof(*r) + m_length(args, NULL);
974 		r->rq_args = args;
975 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
976 			/*
977 			 * RPCSEC_GSS uses this return code
978 			 * for requests that form part of its
979 			 * context establishment protocol and
980 			 * should not be dispatched to the
981 			 * application.
982 			 */
983 			if (why != RPCSEC_GSS_NODISPATCH)
984 				svcerr_auth(r, why);
985 			goto call_done;
986 		}
987 
988 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
989 			svcerr_decode(r);
990 			goto call_done;
991 		}
992 
993 		/*
994 		 * Defer enabling DDP until the first non-NULLPROC RPC
995 		 * is received to allow STARTTLS authentication to
996 		 * enable TLS offload first.
997 		 */
998 		if (xprt->xp_doneddp == 0 && r->rq_proc != NULLPROC &&
999 		    atomic_cmpset_int(&xprt->xp_doneddp, 0, 1)) {
1000 			if (xprt->xp_socket->so_proto->pr_protocol ==
1001 			    IPPROTO_TCP) {
1002 				int optval = 1;
1003 
1004 				(void)so_setsockopt(xprt->xp_socket,
1005 				    IPPROTO_TCP, TCP_USE_DDP, &optval,
1006 				    sizeof(optval));
1007 			}
1008 		}
1009 
1010 		/*
1011 		 * Everything checks out, return request to caller.
1012 		 */
1013 		*rqstp_ret = r;
1014 		r = NULL;
1015 	}
1016 call_done:
1017 	if (r) {
1018 		svc_freereq(r);
1019 		r = NULL;
1020 	}
1021 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1022 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1023 			(*s->slc_dispatch)(xprt);
1024 		xprt_unregister(xprt);
1025 	}
1026 
1027 	return (stat);
1028 }
1029 
1030 static void
1031 svc_executereq(struct svc_req *rqstp)
1032 {
1033 	SVCXPRT *xprt = rqstp->rq_xprt;
1034 	SVCPOOL *pool = xprt->xp_pool;
1035 	int prog_found;
1036 	rpcvers_t low_vers;
1037 	rpcvers_t high_vers;
1038 	struct svc_callout *s;
1039 
1040 	/* now match message with a registered service*/
1041 	prog_found = FALSE;
1042 	low_vers = (rpcvers_t) -1L;
1043 	high_vers = (rpcvers_t) 0L;
1044 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1045 		if (s->sc_prog == rqstp->rq_prog) {
1046 			if (s->sc_vers == rqstp->rq_vers) {
1047 				/*
1048 				 * We hand ownership of r to the
1049 				 * dispatch method - they must call
1050 				 * svc_freereq.
1051 				 */
1052 				(*s->sc_dispatch)(rqstp, xprt);
1053 				return;
1054 			}  /* found correct version */
1055 			prog_found = TRUE;
1056 			if (s->sc_vers < low_vers)
1057 				low_vers = s->sc_vers;
1058 			if (s->sc_vers > high_vers)
1059 				high_vers = s->sc_vers;
1060 		}   /* found correct program */
1061 	}
1062 
1063 	/*
1064 	 * if we got here, the program or version
1065 	 * is not served ...
1066 	 */
1067 	if (prog_found)
1068 		svcerr_progvers(rqstp, low_vers, high_vers);
1069 	else
1070 		svcerr_noprog(rqstp);
1071 
1072 	svc_freereq(rqstp);
1073 }
1074 
1075 static void
1076 svc_checkidle(SVCGROUP *grp)
1077 {
1078 	SVCXPRT *xprt, *nxprt;
1079 	time_t timo;
1080 	struct svcxprt_list cleanup;
1081 
1082 	TAILQ_INIT(&cleanup);
1083 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1084 		/*
1085 		 * Only some transports have idle timers. Don't time
1086 		 * something out which is just waking up.
1087 		 */
1088 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1089 			continue;
1090 
1091 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1092 		if (time_uptime > timo) {
1093 			xprt_unregister_locked(xprt);
1094 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1095 		}
1096 	}
1097 
1098 	mtx_unlock(&grp->sg_lock);
1099 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1100 		soshutdown(xprt->xp_socket, SHUT_WR);
1101 		SVC_RELEASE(xprt);
1102 	}
1103 	mtx_lock(&grp->sg_lock);
1104 }
1105 
1106 static void
1107 svc_assign_waiting_sockets(SVCPOOL *pool)
1108 {
1109 	SVCGROUP *grp;
1110 	SVCXPRT *xprt;
1111 	int g;
1112 
1113 	for (g = 0; g < pool->sp_groupcount; g++) {
1114 		grp = &pool->sp_groups[g];
1115 		mtx_lock(&grp->sg_lock);
1116 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1117 			if (xprt_assignthread(xprt))
1118 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1119 			else
1120 				break;
1121 		}
1122 		mtx_unlock(&grp->sg_lock);
1123 	}
1124 }
1125 
1126 static void
1127 svc_change_space_used(SVCPOOL *pool, long delta)
1128 {
1129 	unsigned long value;
1130 
1131 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1132 	if (delta > 0) {
1133 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1134 			pool->sp_space_throttled = TRUE;
1135 			pool->sp_space_throttle_count++;
1136 		}
1137 		if (value > pool->sp_space_used_highest)
1138 			pool->sp_space_used_highest = value;
1139 	} else {
1140 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1141 			pool->sp_space_throttled = FALSE;
1142 			svc_assign_waiting_sockets(pool);
1143 		}
1144 	}
1145 }
1146 
1147 static bool_t
1148 svc_request_space_available(SVCPOOL *pool)
1149 {
1150 
1151 	if (pool->sp_space_throttled)
1152 		return (FALSE);
1153 	return (TRUE);
1154 }
1155 
1156 static void
1157 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1158 {
1159 	SVCPOOL *pool = grp->sg_pool;
1160 	SVCTHREAD *st, *stpref;
1161 	SVCXPRT *xprt;
1162 	enum xprt_stat stat;
1163 	struct svc_req *rqstp;
1164 	struct proc *p;
1165 	long sz;
1166 	int error;
1167 
1168 	st = mem_alloc(sizeof(*st));
1169 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1170 	st->st_pool = pool;
1171 	st->st_xprt = NULL;
1172 	STAILQ_INIT(&st->st_reqs);
1173 	cv_init(&st->st_cond, "rpcsvc");
1174 
1175 	mtx_lock(&grp->sg_lock);
1176 
1177 	/*
1178 	 * If we are a new thread which was spawned to cope with
1179 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1180 	 */
1181 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1182 		grp->sg_state = SVCPOOL_ACTIVE;
1183 
1184 	while (grp->sg_state != SVCPOOL_CLOSING) {
1185 		/*
1186 		 * Create new thread if requested.
1187 		 */
1188 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1189 			grp->sg_state = SVCPOOL_THREADSTARTING;
1190 			grp->sg_lastcreatetime = time_uptime;
1191 			mtx_unlock(&grp->sg_lock);
1192 			svc_new_thread(grp);
1193 			mtx_lock(&grp->sg_lock);
1194 			continue;
1195 		}
1196 
1197 		/*
1198 		 * Check for idle transports once per second.
1199 		 */
1200 		if (time_uptime > grp->sg_lastidlecheck) {
1201 			grp->sg_lastidlecheck = time_uptime;
1202 			svc_checkidle(grp);
1203 		}
1204 
1205 		xprt = st->st_xprt;
1206 		if (!xprt) {
1207 			/*
1208 			 * Enforce maxthreads count.
1209 			 */
1210 			if (!ismaster && grp->sg_threadcount >
1211 			    grp->sg_maxthreads)
1212 				break;
1213 
1214 			/*
1215 			 * Before sleeping, see if we can find an
1216 			 * active transport which isn't being serviced
1217 			 * by a thread.
1218 			 */
1219 			if (svc_request_space_available(pool) &&
1220 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1221 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1222 				SVC_ACQUIRE(xprt);
1223 				xprt->xp_thread = st;
1224 				st->st_xprt = xprt;
1225 				continue;
1226 			}
1227 
1228 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1229 			if (ismaster || (!ismaster &&
1230 			    grp->sg_threadcount > grp->sg_minthreads))
1231 				error = cv_timedwait_sig(&st->st_cond,
1232 				    &grp->sg_lock, 5 * hz);
1233 			else
1234 				error = cv_wait_sig(&st->st_cond,
1235 				    &grp->sg_lock);
1236 			if (st->st_xprt == NULL)
1237 				LIST_REMOVE(st, st_ilink);
1238 
1239 			/*
1240 			 * Reduce worker thread count when idle.
1241 			 */
1242 			if (error == EWOULDBLOCK) {
1243 				if (!ismaster
1244 				    && (grp->sg_threadcount
1245 					> grp->sg_minthreads)
1246 					&& !st->st_xprt)
1247 					break;
1248 			} else if (error != 0) {
1249 				KASSERT(error == EINTR || error == ERESTART,
1250 				    ("non-signal error %d", error));
1251 				mtx_unlock(&grp->sg_lock);
1252 				p = curproc;
1253 				PROC_LOCK(p);
1254 				if (P_SHOULDSTOP(p) ||
1255 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1256 					thread_suspend_check(0);
1257 					PROC_UNLOCK(p);
1258 					mtx_lock(&grp->sg_lock);
1259 				} else {
1260 					PROC_UNLOCK(p);
1261 					svc_exit(pool);
1262 					mtx_lock(&grp->sg_lock);
1263 					break;
1264 				}
1265 			}
1266 			continue;
1267 		}
1268 		mtx_unlock(&grp->sg_lock);
1269 
1270 		/*
1271 		 * Drain the transport socket and queue up any RPCs.
1272 		 */
1273 		xprt->xp_lastactive = time_uptime;
1274 		do {
1275 			if (!svc_request_space_available(pool))
1276 				break;
1277 			rqstp = NULL;
1278 			stat = svc_getreq(xprt, &rqstp);
1279 			if (rqstp) {
1280 				svc_change_space_used(pool, rqstp->rq_size);
1281 				/*
1282 				 * See if the application has a preference
1283 				 * for some other thread.
1284 				 */
1285 				if (pool->sp_assign) {
1286 					stpref = pool->sp_assign(st, rqstp);
1287 					rqstp->rq_thread = stpref;
1288 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1289 					    rqstp, rq_link);
1290 					mtx_unlock(&stpref->st_lock);
1291 					if (stpref != st)
1292 						rqstp = NULL;
1293 				} else {
1294 					rqstp->rq_thread = st;
1295 					STAILQ_INSERT_TAIL(&st->st_reqs,
1296 					    rqstp, rq_link);
1297 				}
1298 			}
1299 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1300 		    && grp->sg_state != SVCPOOL_CLOSING);
1301 
1302 		/*
1303 		 * Move this transport to the end of the active list to
1304 		 * ensure fairness when multiple transports are active.
1305 		 * If this was the last queued request, svc_getreq will end
1306 		 * up calling xprt_inactive to remove from the active list.
1307 		 */
1308 		mtx_lock(&grp->sg_lock);
1309 		xprt->xp_thread = NULL;
1310 		st->st_xprt = NULL;
1311 		if (xprt->xp_active) {
1312 			if (!svc_request_space_available(pool) ||
1313 			    !xprt_assignthread(xprt))
1314 				TAILQ_INSERT_TAIL(&grp->sg_active,
1315 				    xprt, xp_alink);
1316 		}
1317 		mtx_unlock(&grp->sg_lock);
1318 		SVC_RELEASE(xprt);
1319 
1320 		/*
1321 		 * Execute what we have queued.
1322 		 */
1323 		mtx_lock(&st->st_lock);
1324 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1325 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1326 			mtx_unlock(&st->st_lock);
1327 			sz = (long)rqstp->rq_size;
1328 			svc_executereq(rqstp);
1329 			svc_change_space_used(pool, -sz);
1330 			mtx_lock(&st->st_lock);
1331 		}
1332 		mtx_unlock(&st->st_lock);
1333 		mtx_lock(&grp->sg_lock);
1334 	}
1335 
1336 	if (st->st_xprt) {
1337 		xprt = st->st_xprt;
1338 		st->st_xprt = NULL;
1339 		SVC_RELEASE(xprt);
1340 	}
1341 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1342 	mtx_destroy(&st->st_lock);
1343 	cv_destroy(&st->st_cond);
1344 	mem_free(st, sizeof(*st));
1345 
1346 	grp->sg_threadcount--;
1347 	if (!ismaster)
1348 		wakeup(grp);
1349 	mtx_unlock(&grp->sg_lock);
1350 }
1351 
1352 static void
1353 svc_thread_start(void *arg)
1354 {
1355 
1356 	svc_run_internal((SVCGROUP *) arg, FALSE);
1357 	kthread_exit();
1358 }
1359 
1360 static void
1361 svc_new_thread(SVCGROUP *grp)
1362 {
1363 	SVCPOOL *pool = grp->sg_pool;
1364 	struct thread *td;
1365 
1366 	mtx_lock(&grp->sg_lock);
1367 	grp->sg_threadcount++;
1368 	mtx_unlock(&grp->sg_lock);
1369 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1370 	    "%s: service", pool->sp_name);
1371 }
1372 
1373 void
1374 svc_run(SVCPOOL *pool)
1375 {
1376 	int g, i;
1377 	struct proc *p;
1378 	struct thread *td;
1379 	SVCGROUP *grp;
1380 
1381 	p = curproc;
1382 	td = curthread;
1383 	snprintf(td->td_name, sizeof(td->td_name),
1384 	    "%s: master", pool->sp_name);
1385 	pool->sp_state = SVCPOOL_ACTIVE;
1386 	pool->sp_proc = p;
1387 
1388 	/* Choose group count based on number of threads and CPUs. */
1389 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1390 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1391 	for (g = 0; g < pool->sp_groupcount; g++) {
1392 		grp = &pool->sp_groups[g];
1393 		grp->sg_minthreads = max(1,
1394 		    pool->sp_minthreads / pool->sp_groupcount);
1395 		grp->sg_maxthreads = max(1,
1396 		    pool->sp_maxthreads / pool->sp_groupcount);
1397 		grp->sg_lastcreatetime = time_uptime;
1398 	}
1399 
1400 	/* Starting threads */
1401 	pool->sp_groups[0].sg_threadcount++;
1402 	for (g = 0; g < pool->sp_groupcount; g++) {
1403 		grp = &pool->sp_groups[g];
1404 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1405 			svc_new_thread(grp);
1406 	}
1407 	svc_run_internal(&pool->sp_groups[0], TRUE);
1408 
1409 	/* Waiting for threads to stop. */
1410 	for (g = 0; g < pool->sp_groupcount; g++) {
1411 		grp = &pool->sp_groups[g];
1412 		mtx_lock(&grp->sg_lock);
1413 		while (grp->sg_threadcount > 0)
1414 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1415 		mtx_unlock(&grp->sg_lock);
1416 	}
1417 }
1418 
1419 void
1420 svc_exit(SVCPOOL *pool)
1421 {
1422 	SVCGROUP *grp;
1423 	SVCTHREAD *st;
1424 	int g;
1425 
1426 	pool->sp_state = SVCPOOL_CLOSING;
1427 	for (g = 0; g < pool->sp_groupcount; g++) {
1428 		grp = &pool->sp_groups[g];
1429 		mtx_lock(&grp->sg_lock);
1430 		if (grp->sg_state != SVCPOOL_CLOSING) {
1431 			grp->sg_state = SVCPOOL_CLOSING;
1432 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1433 				cv_signal(&st->st_cond);
1434 		}
1435 		mtx_unlock(&grp->sg_lock);
1436 	}
1437 }
1438 
1439 bool_t
1440 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1441 {
1442 	struct mbuf *m;
1443 	XDR xdrs;
1444 	bool_t stat;
1445 
1446 	m = rqstp->rq_args;
1447 	rqstp->rq_args = NULL;
1448 
1449 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1450 	stat = xargs(&xdrs, args);
1451 	XDR_DESTROY(&xdrs);
1452 
1453 	return (stat);
1454 }
1455 
1456 bool_t
1457 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1458 {
1459 	XDR xdrs;
1460 
1461 	if (rqstp->rq_addr) {
1462 		free(rqstp->rq_addr, M_SONAME);
1463 		rqstp->rq_addr = NULL;
1464 	}
1465 
1466 	xdrs.x_op = XDR_FREE;
1467 	return (xargs(&xdrs, args));
1468 }
1469 
1470 void
1471 svc_freereq(struct svc_req *rqstp)
1472 {
1473 	SVCTHREAD *st;
1474 	SVCPOOL *pool;
1475 
1476 	st = rqstp->rq_thread;
1477 	if (st) {
1478 		pool = st->st_pool;
1479 		if (pool->sp_done)
1480 			pool->sp_done(st, rqstp);
1481 	}
1482 
1483 	if (rqstp->rq_auth.svc_ah_ops)
1484 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1485 
1486 	if (rqstp->rq_xprt) {
1487 		SVC_RELEASE(rqstp->rq_xprt);
1488 	}
1489 
1490 	if (rqstp->rq_addr)
1491 		free(rqstp->rq_addr, M_SONAME);
1492 
1493 	if (rqstp->rq_args)
1494 		m_freem(rqstp->rq_args);
1495 
1496 	free(rqstp, M_RPC);
1497 }
1498