xref: /freebsd/sys/rpc/svc.c (revision cc3f4b99653c34ae64f8a1fddea370abefef680e)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice,
12  *   this list of conditions and the following disclaimer in the documentation
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37 
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/sx.h>
60 #include <sys/ucred.h>
61 
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65 
66 #include <rpc/rpc_com.h>
67 
68 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70 
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72     char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75 static void svc_change_space_used(SVCPOOL *pool, int delta);
76 static bool_t svc_request_space_available(SVCPOOL *pool);
77 
78 /* ***************  SVCXPRT related stuff **************** */
79 
80 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
81 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
82 
83 SVCPOOL*
84 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
85 {
86 	SVCPOOL *pool;
87 
88 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
89 
90 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
91 	pool->sp_name = name;
92 	pool->sp_state = SVCPOOL_INIT;
93 	pool->sp_proc = NULL;
94 	TAILQ_INIT(&pool->sp_xlist);
95 	TAILQ_INIT(&pool->sp_active);
96 	TAILQ_INIT(&pool->sp_callouts);
97 	TAILQ_INIT(&pool->sp_lcallouts);
98 	LIST_INIT(&pool->sp_threads);
99 	LIST_INIT(&pool->sp_idlethreads);
100 	pool->sp_minthreads = 1;
101 	pool->sp_maxthreads = 1;
102 	pool->sp_threadcount = 0;
103 
104 	/*
105 	 * Don't use more than a quarter of mbuf clusters or more than
106 	 * 45Mb buffering requests.
107 	 */
108 	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
109 	if (pool->sp_space_high > 45 << 20)
110 		pool->sp_space_high = 45 << 20;
111 	pool->sp_space_low = 2 * pool->sp_space_high / 3;
112 
113 	sysctl_ctx_init(&pool->sp_sysctl);
114 	if (sysctl_base) {
115 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
117 		    pool, 0, svcpool_minthread_sysctl, "I", "");
118 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
120 		    pool, 0, svcpool_maxthread_sysctl, "I", "");
121 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
123 
124 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
125 		    "request_space_used", CTLFLAG_RD,
126 		    &pool->sp_space_used, 0,
127 		    "Space in parsed but not handled requests.");
128 
129 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130 		    "request_space_used_highest", CTLFLAG_RD,
131 		    &pool->sp_space_used_highest, 0,
132 		    "Highest space used since reboot.");
133 
134 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135 		    "request_space_high", CTLFLAG_RW,
136 		    &pool->sp_space_high, 0,
137 		    "Maximum space in parsed but not handled requests.");
138 
139 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 		    "request_space_low", CTLFLAG_RW,
141 		    &pool->sp_space_low, 0,
142 		    "Low water mark for request space.");
143 
144 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 		    "request_space_throttled", CTLFLAG_RD,
146 		    &pool->sp_space_throttled, 0,
147 		    "Whether nfs requests are currently throttled");
148 
149 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 		    "request_space_throttle_count", CTLFLAG_RD,
151 		    &pool->sp_space_throttle_count, 0,
152 		    "Count of times throttling based on request space has occurred");
153 	}
154 
155 	return pool;
156 }
157 
158 void
159 svcpool_destroy(SVCPOOL *pool)
160 {
161 	SVCXPRT *xprt, *nxprt;
162 	struct svc_callout *s;
163 	struct svc_loss_callout *sl;
164 	struct svcxprt_list cleanup;
165 
166 	TAILQ_INIT(&cleanup);
167 	mtx_lock(&pool->sp_lock);
168 
169 	while (TAILQ_FIRST(&pool->sp_xlist)) {
170 		xprt = TAILQ_FIRST(&pool->sp_xlist);
171 		xprt_unregister_locked(xprt);
172 		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
173 	}
174 
175 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
176 		mtx_unlock(&pool->sp_lock);
177 		svc_unreg(pool, s->sc_prog, s->sc_vers);
178 		mtx_lock(&pool->sp_lock);
179 	}
180 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
181 		mtx_unlock(&pool->sp_lock);
182 		svc_loss_unreg(pool, sl->slc_dispatch);
183 		mtx_lock(&pool->sp_lock);
184 	}
185 	mtx_unlock(&pool->sp_lock);
186 
187 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
188 		SVC_RELEASE(xprt);
189 	}
190 
191 	mtx_destroy(&pool->sp_lock);
192 
193 	if (pool->sp_rcache)
194 		replay_freecache(pool->sp_rcache);
195 
196 	sysctl_ctx_free(&pool->sp_sysctl);
197 	free(pool, M_RPC);
198 }
199 
200 static bool_t
201 svcpool_active(SVCPOOL *pool)
202 {
203 	enum svcpool_state state = pool->sp_state;
204 
205 	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
206 		return (FALSE);
207 	return (TRUE);
208 }
209 
210 /*
211  * Sysctl handler to set the minimum thread count on a pool
212  */
213 static int
214 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
215 {
216 	SVCPOOL *pool;
217 	int newminthreads, error, n;
218 
219 	pool = oidp->oid_arg1;
220 	newminthreads = pool->sp_minthreads;
221 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
222 	if (error == 0 && newminthreads != pool->sp_minthreads) {
223 		if (newminthreads > pool->sp_maxthreads)
224 			return (EINVAL);
225 		mtx_lock(&pool->sp_lock);
226 		if (newminthreads > pool->sp_minthreads
227 		    && svcpool_active(pool)) {
228 			/*
229 			 * If the pool is running and we are
230 			 * increasing, create some more threads now.
231 			 */
232 			n = newminthreads - pool->sp_threadcount;
233 			if (n > 0) {
234 				mtx_unlock(&pool->sp_lock);
235 				while (n--)
236 					svc_new_thread(pool);
237 				mtx_lock(&pool->sp_lock);
238 			}
239 		}
240 		pool->sp_minthreads = newminthreads;
241 		mtx_unlock(&pool->sp_lock);
242 	}
243 	return (error);
244 }
245 
246 /*
247  * Sysctl handler to set the maximum thread count on a pool
248  */
249 static int
250 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
251 {
252 	SVCPOOL *pool;
253 	SVCTHREAD *st;
254 	int newmaxthreads, error;
255 
256 	pool = oidp->oid_arg1;
257 	newmaxthreads = pool->sp_maxthreads;
258 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
259 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
260 		if (newmaxthreads < pool->sp_minthreads)
261 			return (EINVAL);
262 		mtx_lock(&pool->sp_lock);
263 		if (newmaxthreads < pool->sp_maxthreads
264 		    && svcpool_active(pool)) {
265 			/*
266 			 * If the pool is running and we are
267 			 * decreasing, wake up some idle threads to
268 			 * encourage them to exit.
269 			 */
270 			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
271 				cv_signal(&st->st_cond);
272 		}
273 		pool->sp_maxthreads = newmaxthreads;
274 		mtx_unlock(&pool->sp_lock);
275 	}
276 	return (error);
277 }
278 
279 /*
280  * Activate a transport handle.
281  */
282 void
283 xprt_register(SVCXPRT *xprt)
284 {
285 	SVCPOOL *pool = xprt->xp_pool;
286 
287 	SVC_ACQUIRE(xprt);
288 	mtx_lock(&pool->sp_lock);
289 	xprt->xp_registered = TRUE;
290 	xprt->xp_active = FALSE;
291 	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
292 	mtx_unlock(&pool->sp_lock);
293 }
294 
295 /*
296  * De-activate a transport handle. Note: the locked version doesn't
297  * release the transport - caller must do that after dropping the pool
298  * lock.
299  */
300 static void
301 xprt_unregister_locked(SVCXPRT *xprt)
302 {
303 	SVCPOOL *pool = xprt->xp_pool;
304 
305 	mtx_assert(&pool->sp_lock, MA_OWNED);
306 	KASSERT(xprt->xp_registered == TRUE,
307 	    ("xprt_unregister_locked: not registered"));
308 	xprt_inactive_locked(xprt);
309 	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
310 	xprt->xp_registered = FALSE;
311 }
312 
313 void
314 xprt_unregister(SVCXPRT *xprt)
315 {
316 	SVCPOOL *pool = xprt->xp_pool;
317 
318 	mtx_lock(&pool->sp_lock);
319 	if (xprt->xp_registered == FALSE) {
320 		/* Already unregistered by another thread */
321 		mtx_unlock(&pool->sp_lock);
322 		return;
323 	}
324 	xprt_unregister_locked(xprt);
325 	mtx_unlock(&pool->sp_lock);
326 
327 	SVC_RELEASE(xprt);
328 }
329 
330 /*
331  * Attempt to assign a service thread to this transport.
332  */
333 static int
334 xprt_assignthread(SVCXPRT *xprt)
335 {
336 	SVCPOOL *pool = xprt->xp_pool;
337 	SVCTHREAD *st;
338 
339 	mtx_assert(&pool->sp_lock, MA_OWNED);
340 	st = LIST_FIRST(&pool->sp_idlethreads);
341 	if (st) {
342 		LIST_REMOVE(st, st_ilink);
343 		st->st_idle = FALSE;
344 		SVC_ACQUIRE(xprt);
345 		xprt->xp_thread = st;
346 		st->st_xprt = xprt;
347 		cv_signal(&st->st_cond);
348 		return (TRUE);
349 	} else {
350 		/*
351 		 * See if we can create a new thread. The
352 		 * actual thread creation happens in
353 		 * svc_run_internal because our locking state
354 		 * is poorly defined (we are typically called
355 		 * from a socket upcall). Don't create more
356 		 * than one thread per second.
357 		 */
358 		if (pool->sp_state == SVCPOOL_ACTIVE
359 		    && pool->sp_lastcreatetime < time_uptime
360 		    && pool->sp_threadcount < pool->sp_maxthreads) {
361 			pool->sp_state = SVCPOOL_THREADWANTED;
362 		}
363 	}
364 	return (FALSE);
365 }
366 
367 void
368 xprt_active(SVCXPRT *xprt)
369 {
370 	SVCPOOL *pool = xprt->xp_pool;
371 
372 	mtx_lock(&pool->sp_lock);
373 
374 	if (!xprt->xp_registered) {
375 		/*
376 		 * Race with xprt_unregister - we lose.
377 		 */
378 		mtx_unlock(&pool->sp_lock);
379 		return;
380 	}
381 
382 	if (!xprt->xp_active) {
383 		xprt->xp_active = TRUE;
384 		if (xprt->xp_thread == NULL) {
385 			if (!svc_request_space_available(pool) ||
386 			    !xprt_assignthread(xprt))
387 				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
388 				    xp_alink);
389 		}
390 	}
391 
392 	mtx_unlock(&pool->sp_lock);
393 }
394 
395 void
396 xprt_inactive_locked(SVCXPRT *xprt)
397 {
398 	SVCPOOL *pool = xprt->xp_pool;
399 
400 	mtx_assert(&pool->sp_lock, MA_OWNED);
401 	if (xprt->xp_active) {
402 		if (xprt->xp_thread == NULL)
403 			TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
404 		xprt->xp_active = FALSE;
405 	}
406 }
407 
408 void
409 xprt_inactive(SVCXPRT *xprt)
410 {
411 	SVCPOOL *pool = xprt->xp_pool;
412 
413 	mtx_lock(&pool->sp_lock);
414 	xprt_inactive_locked(xprt);
415 	mtx_unlock(&pool->sp_lock);
416 }
417 
418 /*
419  * Variant of xprt_inactive() for use only when sure that port is
420  * assigned to thread. For example, withing receive handlers.
421  */
422 void
423 xprt_inactive_self(SVCXPRT *xprt)
424 {
425 
426 	KASSERT(xprt->xp_thread != NULL,
427 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
428 	xprt->xp_active = FALSE;
429 }
430 
431 /*
432  * Add a service program to the callout list.
433  * The dispatch routine will be called when a rpc request for this
434  * program number comes in.
435  */
436 bool_t
437 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
438     void (*dispatch)(struct svc_req *, SVCXPRT *),
439     const struct netconfig *nconf)
440 {
441 	SVCPOOL *pool = xprt->xp_pool;
442 	struct svc_callout *s;
443 	char *netid = NULL;
444 	int flag = 0;
445 
446 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
447 
448 	if (xprt->xp_netid) {
449 		netid = strdup(xprt->xp_netid, M_RPC);
450 		flag = 1;
451 	} else if (nconf && nconf->nc_netid) {
452 		netid = strdup(nconf->nc_netid, M_RPC);
453 		flag = 1;
454 	} /* must have been created with svc_raw_create */
455 	if ((netid == NULL) && (flag == 1)) {
456 		return (FALSE);
457 	}
458 
459 	mtx_lock(&pool->sp_lock);
460 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
461 		if (netid)
462 			free(netid, M_RPC);
463 		if (s->sc_dispatch == dispatch)
464 			goto rpcb_it; /* he is registering another xptr */
465 		mtx_unlock(&pool->sp_lock);
466 		return (FALSE);
467 	}
468 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
469 	if (s == NULL) {
470 		if (netid)
471 			free(netid, M_RPC);
472 		mtx_unlock(&pool->sp_lock);
473 		return (FALSE);
474 	}
475 
476 	s->sc_prog = prog;
477 	s->sc_vers = vers;
478 	s->sc_dispatch = dispatch;
479 	s->sc_netid = netid;
480 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
481 
482 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
483 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
484 
485 rpcb_it:
486 	mtx_unlock(&pool->sp_lock);
487 	/* now register the information with the local binder service */
488 	if (nconf) {
489 		bool_t dummy;
490 		struct netconfig tnc;
491 		struct netbuf nb;
492 		tnc = *nconf;
493 		nb.buf = &xprt->xp_ltaddr;
494 		nb.len = xprt->xp_ltaddr.ss_len;
495 		dummy = rpcb_set(prog, vers, &tnc, &nb);
496 		return (dummy);
497 	}
498 	return (TRUE);
499 }
500 
501 /*
502  * Remove a service program from the callout list.
503  */
504 void
505 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
506 {
507 	struct svc_callout *s;
508 
509 	/* unregister the information anyway */
510 	(void) rpcb_unset(prog, vers, NULL);
511 	mtx_lock(&pool->sp_lock);
512 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
513 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
514 		if (s->sc_netid)
515 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
516 		mem_free(s, sizeof (struct svc_callout));
517 	}
518 	mtx_unlock(&pool->sp_lock);
519 }
520 
521 /*
522  * Add a service connection loss program to the callout list.
523  * The dispatch routine will be called when some port in ths pool die.
524  */
525 bool_t
526 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
527 {
528 	SVCPOOL *pool = xprt->xp_pool;
529 	struct svc_loss_callout *s;
530 
531 	mtx_lock(&pool->sp_lock);
532 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
533 		if (s->slc_dispatch == dispatch)
534 			break;
535 	}
536 	if (s != NULL) {
537 		mtx_unlock(&pool->sp_lock);
538 		return (TRUE);
539 	}
540 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
541 	if (s == NULL) {
542 		mtx_unlock(&pool->sp_lock);
543 		return (FALSE);
544 	}
545 	s->slc_dispatch = dispatch;
546 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
547 	mtx_unlock(&pool->sp_lock);
548 	return (TRUE);
549 }
550 
551 /*
552  * Remove a service connection loss program from the callout list.
553  */
554 void
555 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
556 {
557 	struct svc_loss_callout *s;
558 
559 	mtx_lock(&pool->sp_lock);
560 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
561 		if (s->slc_dispatch == dispatch) {
562 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
563 			free(s, M_RPC);
564 			break;
565 		}
566 	}
567 	mtx_unlock(&pool->sp_lock);
568 }
569 
570 /* ********************** CALLOUT list related stuff ************* */
571 
572 /*
573  * Search the callout list for a program number, return the callout
574  * struct.
575  */
576 static struct svc_callout *
577 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
578 {
579 	struct svc_callout *s;
580 
581 	mtx_assert(&pool->sp_lock, MA_OWNED);
582 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
583 		if (s->sc_prog == prog && s->sc_vers == vers
584 		    && (netid == NULL || s->sc_netid == NULL ||
585 			strcmp(netid, s->sc_netid) == 0))
586 			break;
587 	}
588 
589 	return (s);
590 }
591 
592 /* ******************* REPLY GENERATION ROUTINES  ************ */
593 
594 static bool_t
595 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
596     struct mbuf *body)
597 {
598 	SVCXPRT *xprt = rqstp->rq_xprt;
599 	bool_t ok;
600 
601 	if (rqstp->rq_args) {
602 		m_freem(rqstp->rq_args);
603 		rqstp->rq_args = NULL;
604 	}
605 
606 	if (xprt->xp_pool->sp_rcache)
607 		replay_setreply(xprt->xp_pool->sp_rcache,
608 		    rply, svc_getrpccaller(rqstp), body);
609 
610 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
611 		return (FALSE);
612 
613 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
614 	if (rqstp->rq_addr) {
615 		free(rqstp->rq_addr, M_SONAME);
616 		rqstp->rq_addr = NULL;
617 	}
618 
619 	return (ok);
620 }
621 
622 /*
623  * Send a reply to an rpc request
624  */
625 bool_t
626 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
627 {
628 	struct rpc_msg rply;
629 	struct mbuf *m;
630 	XDR xdrs;
631 	bool_t ok;
632 
633 	rply.rm_xid = rqstp->rq_xid;
634 	rply.rm_direction = REPLY;
635 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
636 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
637 	rply.acpted_rply.ar_stat = SUCCESS;
638 	rply.acpted_rply.ar_results.where = NULL;
639 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
640 
641 	m = m_getcl(M_WAITOK, MT_DATA, 0);
642 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
643 	ok = xdr_results(&xdrs, xdr_location);
644 	XDR_DESTROY(&xdrs);
645 
646 	if (ok) {
647 		return (svc_sendreply_common(rqstp, &rply, m));
648 	} else {
649 		m_freem(m);
650 		return (FALSE);
651 	}
652 }
653 
654 bool_t
655 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
656 {
657 	struct rpc_msg rply;
658 
659 	rply.rm_xid = rqstp->rq_xid;
660 	rply.rm_direction = REPLY;
661 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
662 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
663 	rply.acpted_rply.ar_stat = SUCCESS;
664 	rply.acpted_rply.ar_results.where = NULL;
665 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
666 
667 	return (svc_sendreply_common(rqstp, &rply, m));
668 }
669 
670 /*
671  * No procedure error reply
672  */
673 void
674 svcerr_noproc(struct svc_req *rqstp)
675 {
676 	SVCXPRT *xprt = rqstp->rq_xprt;
677 	struct rpc_msg rply;
678 
679 	rply.rm_xid = rqstp->rq_xid;
680 	rply.rm_direction = REPLY;
681 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
682 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
683 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
684 
685 	if (xprt->xp_pool->sp_rcache)
686 		replay_setreply(xprt->xp_pool->sp_rcache,
687 		    &rply, svc_getrpccaller(rqstp), NULL);
688 
689 	svc_sendreply_common(rqstp, &rply, NULL);
690 }
691 
692 /*
693  * Can't decode args error reply
694  */
695 void
696 svcerr_decode(struct svc_req *rqstp)
697 {
698 	SVCXPRT *xprt = rqstp->rq_xprt;
699 	struct rpc_msg rply;
700 
701 	rply.rm_xid = rqstp->rq_xid;
702 	rply.rm_direction = REPLY;
703 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
704 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
705 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
706 
707 	if (xprt->xp_pool->sp_rcache)
708 		replay_setreply(xprt->xp_pool->sp_rcache,
709 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
710 
711 	svc_sendreply_common(rqstp, &rply, NULL);
712 }
713 
714 /*
715  * Some system error
716  */
717 void
718 svcerr_systemerr(struct svc_req *rqstp)
719 {
720 	SVCXPRT *xprt = rqstp->rq_xprt;
721 	struct rpc_msg rply;
722 
723 	rply.rm_xid = rqstp->rq_xid;
724 	rply.rm_direction = REPLY;
725 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
726 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
727 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
728 
729 	if (xprt->xp_pool->sp_rcache)
730 		replay_setreply(xprt->xp_pool->sp_rcache,
731 		    &rply, svc_getrpccaller(rqstp), NULL);
732 
733 	svc_sendreply_common(rqstp, &rply, NULL);
734 }
735 
736 /*
737  * Authentication error reply
738  */
739 void
740 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
741 {
742 	SVCXPRT *xprt = rqstp->rq_xprt;
743 	struct rpc_msg rply;
744 
745 	rply.rm_xid = rqstp->rq_xid;
746 	rply.rm_direction = REPLY;
747 	rply.rm_reply.rp_stat = MSG_DENIED;
748 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
749 	rply.rjcted_rply.rj_why = why;
750 
751 	if (xprt->xp_pool->sp_rcache)
752 		replay_setreply(xprt->xp_pool->sp_rcache,
753 		    &rply, svc_getrpccaller(rqstp), NULL);
754 
755 	svc_sendreply_common(rqstp, &rply, NULL);
756 }
757 
758 /*
759  * Auth too weak error reply
760  */
761 void
762 svcerr_weakauth(struct svc_req *rqstp)
763 {
764 
765 	svcerr_auth(rqstp, AUTH_TOOWEAK);
766 }
767 
768 /*
769  * Program unavailable error reply
770  */
771 void
772 svcerr_noprog(struct svc_req *rqstp)
773 {
774 	SVCXPRT *xprt = rqstp->rq_xprt;
775 	struct rpc_msg rply;
776 
777 	rply.rm_xid = rqstp->rq_xid;
778 	rply.rm_direction = REPLY;
779 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
780 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
781 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
782 
783 	if (xprt->xp_pool->sp_rcache)
784 		replay_setreply(xprt->xp_pool->sp_rcache,
785 		    &rply, svc_getrpccaller(rqstp), NULL);
786 
787 	svc_sendreply_common(rqstp, &rply, NULL);
788 }
789 
790 /*
791  * Program version mismatch error reply
792  */
793 void
794 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
795 {
796 	SVCXPRT *xprt = rqstp->rq_xprt;
797 	struct rpc_msg rply;
798 
799 	rply.rm_xid = rqstp->rq_xid;
800 	rply.rm_direction = REPLY;
801 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
802 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
803 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
804 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
805 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
806 
807 	if (xprt->xp_pool->sp_rcache)
808 		replay_setreply(xprt->xp_pool->sp_rcache,
809 		    &rply, svc_getrpccaller(rqstp), NULL);
810 
811 	svc_sendreply_common(rqstp, &rply, NULL);
812 }
813 
814 /*
815  * Allocate a new server transport structure. All fields are
816  * initialized to zero and xp_p3 is initialized to point at an
817  * extension structure to hold various flags and authentication
818  * parameters.
819  */
820 SVCXPRT *
821 svc_xprt_alloc()
822 {
823 	SVCXPRT *xprt;
824 	SVCXPRT_EXT *ext;
825 
826 	xprt = mem_alloc(sizeof(SVCXPRT));
827 	memset(xprt, 0, sizeof(SVCXPRT));
828 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
829 	memset(ext, 0, sizeof(SVCXPRT_EXT));
830 	xprt->xp_p3 = ext;
831 	refcount_init(&xprt->xp_refs, 1);
832 
833 	return (xprt);
834 }
835 
836 /*
837  * Free a server transport structure.
838  */
839 void
840 svc_xprt_free(xprt)
841 	SVCXPRT *xprt;
842 {
843 
844 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
845 	mem_free(xprt, sizeof(SVCXPRT));
846 }
847 
848 /* ******************* SERVER INPUT STUFF ******************* */
849 
850 /*
851  * Read RPC requests from a transport and queue them to be
852  * executed. We handle authentication and replay cache replies here.
853  * Actually dispatching the RPC is deferred till svc_executereq.
854  */
855 static enum xprt_stat
856 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
857 {
858 	SVCPOOL *pool = xprt->xp_pool;
859 	struct svc_req *r;
860 	struct rpc_msg msg;
861 	struct mbuf *args;
862 	struct svc_loss_callout *s;
863 	enum xprt_stat stat;
864 
865 	/* now receive msgs from xprtprt (support batch calls) */
866 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
867 
868 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
869 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
870 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
871 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
872 		enum auth_stat why;
873 
874 		/*
875 		 * Handle replays and authenticate before queuing the
876 		 * request to be executed.
877 		 */
878 		SVC_ACQUIRE(xprt);
879 		r->rq_xprt = xprt;
880 		if (pool->sp_rcache) {
881 			struct rpc_msg repmsg;
882 			struct mbuf *repbody;
883 			enum replay_state rs;
884 			rs = replay_find(pool->sp_rcache, &msg,
885 			    svc_getrpccaller(r), &repmsg, &repbody);
886 			switch (rs) {
887 			case RS_NEW:
888 				break;
889 			case RS_DONE:
890 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
891 				    repbody, &r->rq_reply_seq);
892 				if (r->rq_addr) {
893 					free(r->rq_addr, M_SONAME);
894 					r->rq_addr = NULL;
895 				}
896 				m_freem(args);
897 				goto call_done;
898 
899 			default:
900 				m_freem(args);
901 				goto call_done;
902 			}
903 		}
904 
905 		r->rq_xid = msg.rm_xid;
906 		r->rq_prog = msg.rm_call.cb_prog;
907 		r->rq_vers = msg.rm_call.cb_vers;
908 		r->rq_proc = msg.rm_call.cb_proc;
909 		r->rq_size = sizeof(*r) + m_length(args, NULL);
910 		r->rq_args = args;
911 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
912 			/*
913 			 * RPCSEC_GSS uses this return code
914 			 * for requests that form part of its
915 			 * context establishment protocol and
916 			 * should not be dispatched to the
917 			 * application.
918 			 */
919 			if (why != RPCSEC_GSS_NODISPATCH)
920 				svcerr_auth(r, why);
921 			goto call_done;
922 		}
923 
924 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
925 			svcerr_decode(r);
926 			goto call_done;
927 		}
928 
929 		/*
930 		 * Everything checks out, return request to caller.
931 		 */
932 		*rqstp_ret = r;
933 		r = NULL;
934 	}
935 call_done:
936 	if (r) {
937 		svc_freereq(r);
938 		r = NULL;
939 	}
940 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
941 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
942 			(*s->slc_dispatch)(xprt);
943 		xprt_unregister(xprt);
944 	}
945 
946 	return (stat);
947 }
948 
949 static void
950 svc_executereq(struct svc_req *rqstp)
951 {
952 	SVCXPRT *xprt = rqstp->rq_xprt;
953 	SVCPOOL *pool = xprt->xp_pool;
954 	int prog_found;
955 	rpcvers_t low_vers;
956 	rpcvers_t high_vers;
957 	struct svc_callout *s;
958 
959 	/* now match message with a registered service*/
960 	prog_found = FALSE;
961 	low_vers = (rpcvers_t) -1L;
962 	high_vers = (rpcvers_t) 0L;
963 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
964 		if (s->sc_prog == rqstp->rq_prog) {
965 			if (s->sc_vers == rqstp->rq_vers) {
966 				/*
967 				 * We hand ownership of r to the
968 				 * dispatch method - they must call
969 				 * svc_freereq.
970 				 */
971 				(*s->sc_dispatch)(rqstp, xprt);
972 				return;
973 			}  /* found correct version */
974 			prog_found = TRUE;
975 			if (s->sc_vers < low_vers)
976 				low_vers = s->sc_vers;
977 			if (s->sc_vers > high_vers)
978 				high_vers = s->sc_vers;
979 		}   /* found correct program */
980 	}
981 
982 	/*
983 	 * if we got here, the program or version
984 	 * is not served ...
985 	 */
986 	if (prog_found)
987 		svcerr_progvers(rqstp, low_vers, high_vers);
988 	else
989 		svcerr_noprog(rqstp);
990 
991 	svc_freereq(rqstp);
992 }
993 
994 static void
995 svc_checkidle(SVCPOOL *pool)
996 {
997 	SVCXPRT *xprt, *nxprt;
998 	time_t timo;
999 	struct svcxprt_list cleanup;
1000 
1001 	TAILQ_INIT(&cleanup);
1002 	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
1003 		/*
1004 		 * Only some transports have idle timers. Don't time
1005 		 * something out which is just waking up.
1006 		 */
1007 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1008 			continue;
1009 
1010 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1011 		if (time_uptime > timo) {
1012 			xprt_unregister_locked(xprt);
1013 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1014 		}
1015 	}
1016 
1017 	mtx_unlock(&pool->sp_lock);
1018 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1019 		SVC_RELEASE(xprt);
1020 	}
1021 	mtx_lock(&pool->sp_lock);
1022 
1023 }
1024 
1025 static void
1026 svc_assign_waiting_sockets(SVCPOOL *pool)
1027 {
1028 	SVCXPRT *xprt;
1029 
1030 	mtx_lock(&pool->sp_lock);
1031 	while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
1032 		if (xprt_assignthread(xprt))
1033 			TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1034 		else
1035 			break;
1036 	}
1037 	mtx_unlock(&pool->sp_lock);
1038 }
1039 
1040 static void
1041 svc_change_space_used(SVCPOOL *pool, int delta)
1042 {
1043 	unsigned int value;
1044 
1045 	value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
1046 	if (delta > 0) {
1047 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1048 			pool->sp_space_throttled = TRUE;
1049 			pool->sp_space_throttle_count++;
1050 		}
1051 		if (value > pool->sp_space_used_highest)
1052 			pool->sp_space_used_highest = value;
1053 	} else {
1054 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1055 			pool->sp_space_throttled = FALSE;
1056 			svc_assign_waiting_sockets(pool);
1057 		}
1058 	}
1059 }
1060 
1061 static bool_t
1062 svc_request_space_available(SVCPOOL *pool)
1063 {
1064 
1065 	if (pool->sp_space_throttled)
1066 		return (FALSE);
1067 	return (TRUE);
1068 }
1069 
1070 static void
1071 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
1072 {
1073 	struct svc_reqlist reqs;
1074 	SVCTHREAD *st, *stpref;
1075 	SVCXPRT *xprt;
1076 	enum xprt_stat stat;
1077 	struct svc_req *rqstp;
1078 	size_t sz;
1079 	int error;
1080 
1081 	st = mem_alloc(sizeof(*st));
1082 	st->st_pool = pool;
1083 	st->st_xprt = NULL;
1084 	STAILQ_INIT(&st->st_reqs);
1085 	cv_init(&st->st_cond, "rpcsvc");
1086 	STAILQ_INIT(&reqs);
1087 
1088 	mtx_lock(&pool->sp_lock);
1089 	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1090 
1091 	/*
1092 	 * If we are a new thread which was spawned to cope with
1093 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1094 	 */
1095 	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1096 		pool->sp_state = SVCPOOL_ACTIVE;
1097 
1098 	while (pool->sp_state != SVCPOOL_CLOSING) {
1099 		/*
1100 		 * Create new thread if requested.
1101 		 */
1102 		if (pool->sp_state == SVCPOOL_THREADWANTED) {
1103 			pool->sp_state = SVCPOOL_THREADSTARTING;
1104 			pool->sp_lastcreatetime = time_uptime;
1105 			mtx_unlock(&pool->sp_lock);
1106 			svc_new_thread(pool);
1107 			mtx_lock(&pool->sp_lock);
1108 			continue;
1109 		}
1110 
1111 		/*
1112 		 * Check for idle transports once per second.
1113 		 */
1114 		if (time_uptime > pool->sp_lastidlecheck) {
1115 			pool->sp_lastidlecheck = time_uptime;
1116 			svc_checkidle(pool);
1117 		}
1118 
1119 		xprt = st->st_xprt;
1120 		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1121 			/*
1122 			 * Enforce maxthreads count.
1123 			 */
1124 			if (pool->sp_threadcount > pool->sp_maxthreads)
1125 				break;
1126 
1127 			/*
1128 			 * Before sleeping, see if we can find an
1129 			 * active transport which isn't being serviced
1130 			 * by a thread.
1131 			 */
1132 			if (svc_request_space_available(pool) &&
1133 			    (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
1134 				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1135 				SVC_ACQUIRE(xprt);
1136 				xprt->xp_thread = st;
1137 				st->st_xprt = xprt;
1138 				continue;
1139 			}
1140 
1141 			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1142 			st->st_idle = TRUE;
1143 			if (ismaster || (!ismaster &&
1144 			    pool->sp_threadcount > pool->sp_minthreads))
1145 				error = cv_timedwait_sig(&st->st_cond,
1146 				    &pool->sp_lock, 5 * hz);
1147 			else
1148 				error = cv_wait_sig(&st->st_cond,
1149 				    &pool->sp_lock);
1150 			if (st->st_idle) {
1151 				LIST_REMOVE(st, st_ilink);
1152 				st->st_idle = FALSE;
1153 			}
1154 
1155 			/*
1156 			 * Reduce worker thread count when idle.
1157 			 */
1158 			if (error == EWOULDBLOCK) {
1159 				if (!ismaster
1160 				    && (pool->sp_threadcount
1161 					> pool->sp_minthreads)
1162 					&& !st->st_xprt
1163 					&& STAILQ_EMPTY(&st->st_reqs))
1164 					break;
1165 			} else if (error) {
1166 				mtx_unlock(&pool->sp_lock);
1167 				svc_exit(pool);
1168 				mtx_lock(&pool->sp_lock);
1169 				break;
1170 			}
1171 			continue;
1172 		}
1173 
1174 		if (xprt) {
1175 			/*
1176 			 * Drain the transport socket and queue up any
1177 			 * RPCs.
1178 			 */
1179 			xprt->xp_lastactive = time_uptime;
1180 			do {
1181 				if (!svc_request_space_available(pool))
1182 					break;
1183 				mtx_unlock(&pool->sp_lock);
1184 				rqstp = NULL;
1185 				stat = svc_getreq(xprt, &rqstp);
1186 				if (rqstp) {
1187 					svc_change_space_used(pool, rqstp->rq_size);
1188 					/*
1189 					 * See if the application has
1190 					 * a preference for some other
1191 					 * thread.
1192 					 */
1193 					stpref = st;
1194 					if (pool->sp_assign)
1195 						stpref = pool->sp_assign(st,
1196 						    rqstp);
1197 					else
1198 						mtx_lock(&pool->sp_lock);
1199 
1200 					rqstp->rq_thread = stpref;
1201 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1202 					    rqstp, rq_link);
1203 
1204 					/*
1205 					 * If we assigned the request
1206 					 * to another thread, make
1207 					 * sure its awake and continue
1208 					 * reading from the
1209 					 * socket. Otherwise, try to
1210 					 * find some other thread to
1211 					 * read from the socket and
1212 					 * execute the request
1213 					 * immediately.
1214 					 */
1215 					if (stpref == st)
1216 						break;
1217 					if (stpref->st_idle) {
1218 						LIST_REMOVE(stpref, st_ilink);
1219 						stpref->st_idle = FALSE;
1220 						cv_signal(&stpref->st_cond);
1221 					}
1222 				} else
1223 					mtx_lock(&pool->sp_lock);
1224 			} while (stat == XPRT_MOREREQS
1225 			    && pool->sp_state != SVCPOOL_CLOSING);
1226 
1227 			/*
1228 			 * Move this transport to the end of the
1229 			 * active list to ensure fairness when
1230 			 * multiple transports are active. If this was
1231 			 * the last queued request, svc_getreq will
1232 			 * end up calling xprt_inactive to remove from
1233 			 * the active list.
1234 			 */
1235 			xprt->xp_thread = NULL;
1236 			st->st_xprt = NULL;
1237 			if (xprt->xp_active) {
1238 				if (!svc_request_space_available(pool) ||
1239 				    !xprt_assignthread(xprt))
1240 					TAILQ_INSERT_TAIL(&pool->sp_active,
1241 					    xprt, xp_alink);
1242 			}
1243 			STAILQ_CONCAT(&reqs, &st->st_reqs);
1244 			mtx_unlock(&pool->sp_lock);
1245 			SVC_RELEASE(xprt);
1246 		} else {
1247 			STAILQ_CONCAT(&reqs, &st->st_reqs);
1248 			mtx_unlock(&pool->sp_lock);
1249 		}
1250 
1251 		/*
1252 		 * Execute what we have queued.
1253 		 */
1254 		sz = 0;
1255 		while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
1256 			STAILQ_REMOVE_HEAD(&reqs, rq_link);
1257 			sz += rqstp->rq_size;
1258 			svc_executereq(rqstp);
1259 		}
1260 		svc_change_space_used(pool, -sz);
1261 		mtx_lock(&pool->sp_lock);
1262 	}
1263 
1264 	if (st->st_xprt) {
1265 		xprt = st->st_xprt;
1266 		st->st_xprt = NULL;
1267 		SVC_RELEASE(xprt);
1268 	}
1269 
1270 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1271 	LIST_REMOVE(st, st_link);
1272 	pool->sp_threadcount--;
1273 
1274 	mtx_unlock(&pool->sp_lock);
1275 
1276 	cv_destroy(&st->st_cond);
1277 	mem_free(st, sizeof(*st));
1278 
1279 	if (!ismaster)
1280 		wakeup(pool);
1281 }
1282 
1283 static void
1284 svc_thread_start(void *arg)
1285 {
1286 
1287 	svc_run_internal((SVCPOOL *) arg, FALSE);
1288 	kthread_exit();
1289 }
1290 
1291 static void
1292 svc_new_thread(SVCPOOL *pool)
1293 {
1294 	struct thread *td;
1295 
1296 	pool->sp_threadcount++;
1297 	kthread_add(svc_thread_start, pool,
1298 	    pool->sp_proc, &td, 0, 0,
1299 	    "%s: service", pool->sp_name);
1300 }
1301 
1302 void
1303 svc_run(SVCPOOL *pool)
1304 {
1305 	int i;
1306 	struct proc *p;
1307 	struct thread *td;
1308 
1309 	p = curproc;
1310 	td = curthread;
1311 	snprintf(td->td_name, sizeof(td->td_name),
1312 	    "%s: master", pool->sp_name);
1313 	pool->sp_state = SVCPOOL_ACTIVE;
1314 	pool->sp_proc = p;
1315 	pool->sp_lastcreatetime = time_uptime;
1316 	pool->sp_threadcount = 1;
1317 
1318 	for (i = 1; i < pool->sp_minthreads; i++) {
1319 		svc_new_thread(pool);
1320 	}
1321 
1322 	svc_run_internal(pool, TRUE);
1323 
1324 	mtx_lock(&pool->sp_lock);
1325 	while (pool->sp_threadcount > 0)
1326 		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1327 	mtx_unlock(&pool->sp_lock);
1328 }
1329 
1330 void
1331 svc_exit(SVCPOOL *pool)
1332 {
1333 	SVCTHREAD *st;
1334 
1335 	mtx_lock(&pool->sp_lock);
1336 
1337 	if (pool->sp_state != SVCPOOL_CLOSING) {
1338 		pool->sp_state = SVCPOOL_CLOSING;
1339 		LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1340 			cv_signal(&st->st_cond);
1341 	}
1342 
1343 	mtx_unlock(&pool->sp_lock);
1344 }
1345 
1346 bool_t
1347 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1348 {
1349 	struct mbuf *m;
1350 	XDR xdrs;
1351 	bool_t stat;
1352 
1353 	m = rqstp->rq_args;
1354 	rqstp->rq_args = NULL;
1355 
1356 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1357 	stat = xargs(&xdrs, args);
1358 	XDR_DESTROY(&xdrs);
1359 
1360 	return (stat);
1361 }
1362 
1363 bool_t
1364 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1365 {
1366 	XDR xdrs;
1367 
1368 	if (rqstp->rq_addr) {
1369 		free(rqstp->rq_addr, M_SONAME);
1370 		rqstp->rq_addr = NULL;
1371 	}
1372 
1373 	xdrs.x_op = XDR_FREE;
1374 	return (xargs(&xdrs, args));
1375 }
1376 
1377 void
1378 svc_freereq(struct svc_req *rqstp)
1379 {
1380 	SVCTHREAD *st;
1381 	SVCPOOL *pool;
1382 
1383 	st = rqstp->rq_thread;
1384 	if (st) {
1385 		pool = st->st_pool;
1386 		if (pool->sp_done)
1387 			pool->sp_done(st, rqstp);
1388 	}
1389 
1390 	if (rqstp->rq_auth.svc_ah_ops)
1391 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1392 
1393 	if (rqstp->rq_xprt) {
1394 		SVC_RELEASE(rqstp->rq_xprt);
1395 	}
1396 
1397 	if (rqstp->rq_addr)
1398 		free(rqstp->rq_addr, M_SONAME);
1399 
1400 	if (rqstp->rq_args)
1401 		m_freem(rqstp->rq_args);
1402 
1403 	free(rqstp, M_RPC);
1404 }
1405