xref: /freebsd/sys/rpc/svc.c (revision 129d3046ef0427d3b22b78a71f3494854d817fba)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31 
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38 
39 /*
40  * svc.c, Server-side remote procedure call interface.
41  *
42  * There are two sets of procedures here.  The xprt routines are
43  * for handling transport handles.  The svc routines handle the
44  * list of service routines.
45  *
46  * Copyright (C) 1984, Sun Microsystems, Inc.
47  */
48 
49 #include <sys/param.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
61 
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65 
66 #include <rpc/rpc_com.h>
67 
68 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70 
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72     char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75 
76 /* ***************  SVCXPRT related stuff **************** */
77 
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80 
81 SVCPOOL*
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83 {
84 	SVCPOOL *pool;
85 
86 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87 
88 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89 	pool->sp_name = name;
90 	pool->sp_state = SVCPOOL_INIT;
91 	pool->sp_proc = NULL;
92 	TAILQ_INIT(&pool->sp_xlist);
93 	TAILQ_INIT(&pool->sp_active);
94 	TAILQ_INIT(&pool->sp_callouts);
95 	LIST_INIT(&pool->sp_threads);
96 	LIST_INIT(&pool->sp_idlethreads);
97 	pool->sp_minthreads = 1;
98 	pool->sp_maxthreads = 1;
99 	pool->sp_threadcount = 0;
100 
101 	/*
102 	 * Don't use more than a quarter of mbuf clusters or more than
103 	 * 45Mb buffering requests.
104 	 */
105 	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106 	if (pool->sp_space_high > 45 << 20)
107 		pool->sp_space_high = 45 << 20;
108 	pool->sp_space_low = 2 * pool->sp_space_high / 3;
109 
110 	sysctl_ctx_init(&pool->sp_sysctl);
111 	if (sysctl_base) {
112 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114 		    pool, 0, svcpool_minthread_sysctl, "I", "");
115 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117 		    pool, 0, svcpool_maxthread_sysctl, "I", "");
118 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119 		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120 
121 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 		    "request_space_used", CTLFLAG_RD,
123 		    &pool->sp_space_used, 0,
124 		    "Space in parsed but not handled requests.");
125 
126 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127 		    "request_space_used_highest", CTLFLAG_RD,
128 		    &pool->sp_space_used_highest, 0,
129 		    "Highest space used since reboot.");
130 
131 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 		    "request_space_high", CTLFLAG_RW,
133 		    &pool->sp_space_high, 0,
134 		    "Maximum space in parsed but not handled requests.");
135 
136 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 		    "request_space_low", CTLFLAG_RW,
138 		    &pool->sp_space_low, 0,
139 		    "Low water mark for request space.");
140 
141 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 		    "request_space_throttled", CTLFLAG_RD,
143 		    &pool->sp_space_throttled, 0,
144 		    "Whether nfs requests are currently throttled");
145 
146 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 		    "request_space_throttle_count", CTLFLAG_RD,
148 		    &pool->sp_space_throttle_count, 0,
149 		    "Count of times throttling based on request space has occurred");
150 	}
151 
152 	return pool;
153 }
154 
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158 	SVCXPRT *xprt, *nxprt;
159 	struct svc_callout *s;
160 	struct svcxprt_list cleanup;
161 
162 	TAILQ_INIT(&cleanup);
163 	mtx_lock(&pool->sp_lock);
164 
165 	while (TAILQ_FIRST(&pool->sp_xlist)) {
166 		xprt = TAILQ_FIRST(&pool->sp_xlist);
167 		xprt_unregister_locked(xprt);
168 		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169 	}
170 
171 	while (TAILQ_FIRST(&pool->sp_callouts)) {
172 		s = TAILQ_FIRST(&pool->sp_callouts);
173 		mtx_unlock(&pool->sp_lock);
174 		svc_unreg(pool, s->sc_prog, s->sc_vers);
175 		mtx_lock(&pool->sp_lock);
176 	}
177 
178 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
179 		SVC_RELEASE(xprt);
180 	}
181 
182 	mtx_destroy(&pool->sp_lock);
183 
184 	if (pool->sp_rcache)
185 		replay_freecache(pool->sp_rcache);
186 
187 	sysctl_ctx_free(&pool->sp_sysctl);
188 	free(pool, M_RPC);
189 }
190 
191 static bool_t
192 svcpool_active(SVCPOOL *pool)
193 {
194 	enum svcpool_state state = pool->sp_state;
195 
196 	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197 		return (FALSE);
198 	return (TRUE);
199 }
200 
201 /*
202  * Sysctl handler to set the minimum thread count on a pool
203  */
204 static int
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206 {
207 	SVCPOOL *pool;
208 	int newminthreads, error, n;
209 
210 	pool = oidp->oid_arg1;
211 	newminthreads = pool->sp_minthreads;
212 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213 	if (error == 0 && newminthreads != pool->sp_minthreads) {
214 		if (newminthreads > pool->sp_maxthreads)
215 			return (EINVAL);
216 		mtx_lock(&pool->sp_lock);
217 		if (newminthreads > pool->sp_minthreads
218 		    && svcpool_active(pool)) {
219 			/*
220 			 * If the pool is running and we are
221 			 * increasing, create some more threads now.
222 			 */
223 			n = newminthreads - pool->sp_threadcount;
224 			if (n > 0) {
225 				mtx_unlock(&pool->sp_lock);
226 				while (n--)
227 					svc_new_thread(pool);
228 				mtx_lock(&pool->sp_lock);
229 			}
230 		}
231 		pool->sp_minthreads = newminthreads;
232 		mtx_unlock(&pool->sp_lock);
233 	}
234 	return (error);
235 }
236 
237 /*
238  * Sysctl handler to set the maximum thread count on a pool
239  */
240 static int
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242 {
243 	SVCPOOL *pool;
244 	SVCTHREAD *st;
245 	int newmaxthreads, error;
246 
247 	pool = oidp->oid_arg1;
248 	newmaxthreads = pool->sp_maxthreads;
249 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251 		if (newmaxthreads < pool->sp_minthreads)
252 			return (EINVAL);
253 		mtx_lock(&pool->sp_lock);
254 		if (newmaxthreads < pool->sp_maxthreads
255 		    && svcpool_active(pool)) {
256 			/*
257 			 * If the pool is running and we are
258 			 * decreasing, wake up some idle threads to
259 			 * encourage them to exit.
260 			 */
261 			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262 				cv_signal(&st->st_cond);
263 		}
264 		pool->sp_maxthreads = newmaxthreads;
265 		mtx_unlock(&pool->sp_lock);
266 	}
267 	return (error);
268 }
269 
270 /*
271  * Activate a transport handle.
272  */
273 void
274 xprt_register(SVCXPRT *xprt)
275 {
276 	SVCPOOL *pool = xprt->xp_pool;
277 
278 	mtx_lock(&pool->sp_lock);
279 	xprt->xp_registered = TRUE;
280 	xprt->xp_active = FALSE;
281 	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282 	mtx_unlock(&pool->sp_lock);
283 }
284 
285 /*
286  * De-activate a transport handle. Note: the locked version doesn't
287  * release the transport - caller must do that after dropping the pool
288  * lock.
289  */
290 static void
291 xprt_unregister_locked(SVCXPRT *xprt)
292 {
293 	SVCPOOL *pool = xprt->xp_pool;
294 
295 	if (xprt->xp_active) {
296 		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297 		xprt->xp_active = FALSE;
298 	}
299 	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300 	xprt->xp_registered = FALSE;
301 }
302 
303 void
304 xprt_unregister(SVCXPRT *xprt)
305 {
306 	SVCPOOL *pool = xprt->xp_pool;
307 
308 	mtx_lock(&pool->sp_lock);
309 	xprt_unregister_locked(xprt);
310 	mtx_unlock(&pool->sp_lock);
311 
312 	SVC_RELEASE(xprt);
313 }
314 
315 static void
316 xprt_assignthread(SVCXPRT *xprt)
317 {
318 	SVCPOOL *pool = xprt->xp_pool;
319 	SVCTHREAD *st;
320 
321 	/*
322 	 * Attempt to assign a service thread to this
323 	 * transport.
324 	 */
325 	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326 		if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
327 			break;
328 	}
329 	if (st) {
330 		SVC_ACQUIRE(xprt);
331 		xprt->xp_thread = st;
332 		st->st_xprt = xprt;
333 		cv_signal(&st->st_cond);
334 	} else {
335 		/*
336 		 * See if we can create a new thread. The
337 		 * actual thread creation happens in
338 		 * svc_run_internal because our locking state
339 		 * is poorly defined (we are typically called
340 		 * from a socket upcall). Don't create more
341 		 * than one thread per second.
342 		 */
343 		if (pool->sp_state == SVCPOOL_ACTIVE
344 		    && pool->sp_lastcreatetime < time_uptime
345 		    && pool->sp_threadcount < pool->sp_maxthreads) {
346 			pool->sp_state = SVCPOOL_THREADWANTED;
347 		}
348 	}
349 }
350 
351 void
352 xprt_active(SVCXPRT *xprt)
353 {
354 	SVCPOOL *pool = xprt->xp_pool;
355 
356 	mtx_lock(&pool->sp_lock);
357 
358 	if (!xprt->xp_registered) {
359 		/*
360 		 * Race with xprt_unregister - we lose.
361 		 */
362 		mtx_unlock(&pool->sp_lock);
363 		return;
364 	}
365 
366 	if (!xprt->xp_active) {
367 		TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
368 		xprt->xp_active = TRUE;
369 		xprt_assignthread(xprt);
370 	}
371 
372 	mtx_unlock(&pool->sp_lock);
373 }
374 
375 void
376 xprt_inactive_locked(SVCXPRT *xprt)
377 {
378 	SVCPOOL *pool = xprt->xp_pool;
379 
380 	if (xprt->xp_active) {
381 		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
382 		xprt->xp_active = FALSE;
383 	}
384 }
385 
386 void
387 xprt_inactive(SVCXPRT *xprt)
388 {
389 	SVCPOOL *pool = xprt->xp_pool;
390 
391 	mtx_lock(&pool->sp_lock);
392 	xprt_inactive_locked(xprt);
393 	mtx_unlock(&pool->sp_lock);
394 }
395 
396 /*
397  * Add a service program to the callout list.
398  * The dispatch routine will be called when a rpc request for this
399  * program number comes in.
400  */
401 bool_t
402 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
403     void (*dispatch)(struct svc_req *, SVCXPRT *),
404     const struct netconfig *nconf)
405 {
406 	SVCPOOL *pool = xprt->xp_pool;
407 	struct svc_callout *s;
408 	char *netid = NULL;
409 	int flag = 0;
410 
411 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
412 
413 	if (xprt->xp_netid) {
414 		netid = strdup(xprt->xp_netid, M_RPC);
415 		flag = 1;
416 	} else if (nconf && nconf->nc_netid) {
417 		netid = strdup(nconf->nc_netid, M_RPC);
418 		flag = 1;
419 	} /* must have been created with svc_raw_create */
420 	if ((netid == NULL) && (flag == 1)) {
421 		return (FALSE);
422 	}
423 
424 	mtx_lock(&pool->sp_lock);
425 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
426 		if (netid)
427 			free(netid, M_RPC);
428 		if (s->sc_dispatch == dispatch)
429 			goto rpcb_it; /* he is registering another xptr */
430 		mtx_unlock(&pool->sp_lock);
431 		return (FALSE);
432 	}
433 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
434 	if (s == NULL) {
435 		if (netid)
436 			free(netid, M_RPC);
437 		mtx_unlock(&pool->sp_lock);
438 		return (FALSE);
439 	}
440 
441 	s->sc_prog = prog;
442 	s->sc_vers = vers;
443 	s->sc_dispatch = dispatch;
444 	s->sc_netid = netid;
445 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
446 
447 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
448 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
449 
450 rpcb_it:
451 	mtx_unlock(&pool->sp_lock);
452 	/* now register the information with the local binder service */
453 	if (nconf) {
454 		bool_t dummy;
455 		struct netconfig tnc;
456 		struct netbuf nb;
457 		tnc = *nconf;
458 		nb.buf = &xprt->xp_ltaddr;
459 		nb.len = xprt->xp_ltaddr.ss_len;
460 		dummy = rpcb_set(prog, vers, &tnc, &nb);
461 		return (dummy);
462 	}
463 	return (TRUE);
464 }
465 
466 /*
467  * Remove a service program from the callout list.
468  */
469 void
470 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
471 {
472 	struct svc_callout *s;
473 
474 	/* unregister the information anyway */
475 	(void) rpcb_unset(prog, vers, NULL);
476 	mtx_lock(&pool->sp_lock);
477 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
478 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
479 		if (s->sc_netid)
480 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
481 		mem_free(s, sizeof (struct svc_callout));
482 	}
483 	mtx_unlock(&pool->sp_lock);
484 }
485 
486 /* ********************** CALLOUT list related stuff ************* */
487 
488 /*
489  * Search the callout list for a program number, return the callout
490  * struct.
491  */
492 static struct svc_callout *
493 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
494 {
495 	struct svc_callout *s;
496 
497 	mtx_assert(&pool->sp_lock, MA_OWNED);
498 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
499 		if (s->sc_prog == prog && s->sc_vers == vers
500 		    && (netid == NULL || s->sc_netid == NULL ||
501 			strcmp(netid, s->sc_netid) == 0))
502 			break;
503 	}
504 
505 	return (s);
506 }
507 
508 /* ******************* REPLY GENERATION ROUTINES  ************ */
509 
510 static bool_t
511 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
512     struct mbuf *body)
513 {
514 	SVCXPRT *xprt = rqstp->rq_xprt;
515 	bool_t ok;
516 
517 	if (rqstp->rq_args) {
518 		m_freem(rqstp->rq_args);
519 		rqstp->rq_args = NULL;
520 	}
521 
522 	if (xprt->xp_pool->sp_rcache)
523 		replay_setreply(xprt->xp_pool->sp_rcache,
524 		    rply, svc_getrpccaller(rqstp), body);
525 
526 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
527 		return (FALSE);
528 
529 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
530 	if (rqstp->rq_addr) {
531 		free(rqstp->rq_addr, M_SONAME);
532 		rqstp->rq_addr = NULL;
533 	}
534 
535 	return (ok);
536 }
537 
538 /*
539  * Send a reply to an rpc request
540  */
541 bool_t
542 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
543 {
544 	struct rpc_msg rply;
545 	struct mbuf *m;
546 	XDR xdrs;
547 	bool_t ok;
548 
549 	rply.rm_xid = rqstp->rq_xid;
550 	rply.rm_direction = REPLY;
551 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
552 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
553 	rply.acpted_rply.ar_stat = SUCCESS;
554 	rply.acpted_rply.ar_results.where = NULL;
555 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
556 
557 	MGET(m, M_WAIT, MT_DATA);
558 	MCLGET(m, M_WAIT);
559 	m->m_len = 0;
560 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
561 	ok = xdr_results(&xdrs, xdr_location);
562 	XDR_DESTROY(&xdrs);
563 
564 	if (ok) {
565 		return (svc_sendreply_common(rqstp, &rply, m));
566 	} else {
567 		m_freem(m);
568 		return (FALSE);
569 	}
570 }
571 
572 bool_t
573 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
574 {
575 	struct rpc_msg rply;
576 
577 	rply.rm_xid = rqstp->rq_xid;
578 	rply.rm_direction = REPLY;
579 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
580 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
581 	rply.acpted_rply.ar_stat = SUCCESS;
582 	rply.acpted_rply.ar_results.where = NULL;
583 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
584 
585 	return (svc_sendreply_common(rqstp, &rply, m));
586 }
587 
588 /*
589  * No procedure error reply
590  */
591 void
592 svcerr_noproc(struct svc_req *rqstp)
593 {
594 	SVCXPRT *xprt = rqstp->rq_xprt;
595 	struct rpc_msg rply;
596 
597 	rply.rm_xid = rqstp->rq_xid;
598 	rply.rm_direction = REPLY;
599 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
600 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
601 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
602 
603 	if (xprt->xp_pool->sp_rcache)
604 		replay_setreply(xprt->xp_pool->sp_rcache,
605 		    &rply, svc_getrpccaller(rqstp), NULL);
606 
607 	svc_sendreply_common(rqstp, &rply, NULL);
608 }
609 
610 /*
611  * Can't decode args error reply
612  */
613 void
614 svcerr_decode(struct svc_req *rqstp)
615 {
616 	SVCXPRT *xprt = rqstp->rq_xprt;
617 	struct rpc_msg rply;
618 
619 	rply.rm_xid = rqstp->rq_xid;
620 	rply.rm_direction = REPLY;
621 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
622 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
623 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
624 
625 	if (xprt->xp_pool->sp_rcache)
626 		replay_setreply(xprt->xp_pool->sp_rcache,
627 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
628 
629 	svc_sendreply_common(rqstp, &rply, NULL);
630 }
631 
632 /*
633  * Some system error
634  */
635 void
636 svcerr_systemerr(struct svc_req *rqstp)
637 {
638 	SVCXPRT *xprt = rqstp->rq_xprt;
639 	struct rpc_msg rply;
640 
641 	rply.rm_xid = rqstp->rq_xid;
642 	rply.rm_direction = REPLY;
643 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
644 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
645 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
646 
647 	if (xprt->xp_pool->sp_rcache)
648 		replay_setreply(xprt->xp_pool->sp_rcache,
649 		    &rply, svc_getrpccaller(rqstp), NULL);
650 
651 	svc_sendreply_common(rqstp, &rply, NULL);
652 }
653 
654 /*
655  * Authentication error reply
656  */
657 void
658 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
659 {
660 	SVCXPRT *xprt = rqstp->rq_xprt;
661 	struct rpc_msg rply;
662 
663 	rply.rm_xid = rqstp->rq_xid;
664 	rply.rm_direction = REPLY;
665 	rply.rm_reply.rp_stat = MSG_DENIED;
666 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
667 	rply.rjcted_rply.rj_why = why;
668 
669 	if (xprt->xp_pool->sp_rcache)
670 		replay_setreply(xprt->xp_pool->sp_rcache,
671 		    &rply, svc_getrpccaller(rqstp), NULL);
672 
673 	svc_sendreply_common(rqstp, &rply, NULL);
674 }
675 
676 /*
677  * Auth too weak error reply
678  */
679 void
680 svcerr_weakauth(struct svc_req *rqstp)
681 {
682 
683 	svcerr_auth(rqstp, AUTH_TOOWEAK);
684 }
685 
686 /*
687  * Program unavailable error reply
688  */
689 void
690 svcerr_noprog(struct svc_req *rqstp)
691 {
692 	SVCXPRT *xprt = rqstp->rq_xprt;
693 	struct rpc_msg rply;
694 
695 	rply.rm_xid = rqstp->rq_xid;
696 	rply.rm_direction = REPLY;
697 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
698 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
699 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
700 
701 	if (xprt->xp_pool->sp_rcache)
702 		replay_setreply(xprt->xp_pool->sp_rcache,
703 		    &rply, svc_getrpccaller(rqstp), NULL);
704 
705 	svc_sendreply_common(rqstp, &rply, NULL);
706 }
707 
708 /*
709  * Program version mismatch error reply
710  */
711 void
712 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
713 {
714 	SVCXPRT *xprt = rqstp->rq_xprt;
715 	struct rpc_msg rply;
716 
717 	rply.rm_xid = rqstp->rq_xid;
718 	rply.rm_direction = REPLY;
719 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
720 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
721 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
722 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
723 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
724 
725 	if (xprt->xp_pool->sp_rcache)
726 		replay_setreply(xprt->xp_pool->sp_rcache,
727 		    &rply, svc_getrpccaller(rqstp), NULL);
728 
729 	svc_sendreply_common(rqstp, &rply, NULL);
730 }
731 
732 /*
733  * Allocate a new server transport structure. All fields are
734  * initialized to zero and xp_p3 is initialized to point at an
735  * extension structure to hold various flags and authentication
736  * parameters.
737  */
738 SVCXPRT *
739 svc_xprt_alloc()
740 {
741 	SVCXPRT *xprt;
742 	SVCXPRT_EXT *ext;
743 
744 	xprt = mem_alloc(sizeof(SVCXPRT));
745 	memset(xprt, 0, sizeof(SVCXPRT));
746 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
747 	memset(ext, 0, sizeof(SVCXPRT_EXT));
748 	xprt->xp_p3 = ext;
749 	refcount_init(&xprt->xp_refs, 1);
750 
751 	return (xprt);
752 }
753 
754 /*
755  * Free a server transport structure.
756  */
757 void
758 svc_xprt_free(xprt)
759 	SVCXPRT *xprt;
760 {
761 
762 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
763 	mem_free(xprt, sizeof(SVCXPRT));
764 }
765 
766 /* ******************* SERVER INPUT STUFF ******************* */
767 
768 /*
769  * Read RPC requests from a transport and queue them to be
770  * executed. We handle authentication and replay cache replies here.
771  * Actually dispatching the RPC is deferred till svc_executereq.
772  */
773 static enum xprt_stat
774 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
775 {
776 	SVCPOOL *pool = xprt->xp_pool;
777 	struct svc_req *r;
778 	struct rpc_msg msg;
779 	struct mbuf *args;
780 	enum xprt_stat stat;
781 
782 	/* now receive msgs from xprtprt (support batch calls) */
783 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
784 
785 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
786 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
787 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
788 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
789 		enum auth_stat why;
790 
791 		/*
792 		 * Handle replays and authenticate before queuing the
793 		 * request to be executed.
794 		 */
795 		SVC_ACQUIRE(xprt);
796 		r->rq_xprt = xprt;
797 		if (pool->sp_rcache) {
798 			struct rpc_msg repmsg;
799 			struct mbuf *repbody;
800 			enum replay_state rs;
801 			rs = replay_find(pool->sp_rcache, &msg,
802 			    svc_getrpccaller(r), &repmsg, &repbody);
803 			switch (rs) {
804 			case RS_NEW:
805 				break;
806 			case RS_DONE:
807 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
808 				    repbody);
809 				if (r->rq_addr) {
810 					free(r->rq_addr, M_SONAME);
811 					r->rq_addr = NULL;
812 				}
813 				goto call_done;
814 
815 			default:
816 				goto call_done;
817 			}
818 		}
819 
820 		r->rq_xid = msg.rm_xid;
821 		r->rq_prog = msg.rm_call.cb_prog;
822 		r->rq_vers = msg.rm_call.cb_vers;
823 		r->rq_proc = msg.rm_call.cb_proc;
824 		r->rq_size = sizeof(*r) + m_length(args, NULL);
825 		r->rq_args = args;
826 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
827 			/*
828 			 * RPCSEC_GSS uses this return code
829 			 * for requests that form part of its
830 			 * context establishment protocol and
831 			 * should not be dispatched to the
832 			 * application.
833 			 */
834 			if (why != RPCSEC_GSS_NODISPATCH)
835 				svcerr_auth(r, why);
836 			goto call_done;
837 		}
838 
839 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
840 			svcerr_decode(r);
841 			goto call_done;
842 		}
843 
844 		/*
845 		 * Everything checks out, return request to caller.
846 		 */
847 		*rqstp_ret = r;
848 		r = NULL;
849 	}
850 call_done:
851 	if (r) {
852 		svc_freereq(r);
853 		r = NULL;
854 	}
855 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
856 		xprt_unregister(xprt);
857 	}
858 
859 	return (stat);
860 }
861 
862 static void
863 svc_executereq(struct svc_req *rqstp)
864 {
865 	SVCXPRT *xprt = rqstp->rq_xprt;
866 	SVCPOOL *pool = xprt->xp_pool;
867 	int prog_found;
868 	rpcvers_t low_vers;
869 	rpcvers_t high_vers;
870 	struct svc_callout *s;
871 
872 	/* now match message with a registered service*/
873 	prog_found = FALSE;
874 	low_vers = (rpcvers_t) -1L;
875 	high_vers = (rpcvers_t) 0L;
876 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
877 		if (s->sc_prog == rqstp->rq_prog) {
878 			if (s->sc_vers == rqstp->rq_vers) {
879 				/*
880 				 * We hand ownership of r to the
881 				 * dispatch method - they must call
882 				 * svc_freereq.
883 				 */
884 				(*s->sc_dispatch)(rqstp, xprt);
885 				return;
886 			}  /* found correct version */
887 			prog_found = TRUE;
888 			if (s->sc_vers < low_vers)
889 				low_vers = s->sc_vers;
890 			if (s->sc_vers > high_vers)
891 				high_vers = s->sc_vers;
892 		}   /* found correct program */
893 	}
894 
895 	/*
896 	 * if we got here, the program or version
897 	 * is not served ...
898 	 */
899 	if (prog_found)
900 		svcerr_progvers(rqstp, low_vers, high_vers);
901 	else
902 		svcerr_noprog(rqstp);
903 
904 	svc_freereq(rqstp);
905 }
906 
907 static void
908 svc_checkidle(SVCPOOL *pool)
909 {
910 	SVCXPRT *xprt, *nxprt;
911 	time_t timo;
912 	struct svcxprt_list cleanup;
913 
914 	TAILQ_INIT(&cleanup);
915 	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
916 		/*
917 		 * Only some transports have idle timers. Don't time
918 		 * something out which is just waking up.
919 		 */
920 		if (!xprt->xp_idletimeout || xprt->xp_thread)
921 			continue;
922 
923 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
924 		if (time_uptime > timo) {
925 			xprt_unregister_locked(xprt);
926 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
927 		}
928 	}
929 
930 	mtx_unlock(&pool->sp_lock);
931 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
932 		SVC_RELEASE(xprt);
933 	}
934 	mtx_lock(&pool->sp_lock);
935 
936 }
937 
938 static void
939 svc_assign_waiting_sockets(SVCPOOL *pool)
940 {
941 	SVCXPRT *xprt;
942 
943 	TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
944 		if (!xprt->xp_thread) {
945 			xprt_assignthread(xprt);
946 		}
947 	}
948 }
949 
950 static bool_t
951 svc_request_space_available(SVCPOOL *pool)
952 {
953 
954 	mtx_assert(&pool->sp_lock, MA_OWNED);
955 
956 	if (pool->sp_space_throttled) {
957 		/*
958 		 * Below the low-water yet? If so, assign any waiting sockets.
959 		 */
960 		if (pool->sp_space_used < pool->sp_space_low) {
961 			pool->sp_space_throttled = FALSE;
962 			svc_assign_waiting_sockets(pool);
963 			return TRUE;
964 		}
965 
966 		return FALSE;
967 	} else {
968 		if (pool->sp_space_used
969 		    >= pool->sp_space_high) {
970 			pool->sp_space_throttled = TRUE;
971 			pool->sp_space_throttle_count++;
972 			return FALSE;
973 		}
974 
975 		return TRUE;
976 	}
977 }
978 
979 static void
980 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
981 {
982 	SVCTHREAD *st, *stpref;
983 	SVCXPRT *xprt;
984 	enum xprt_stat stat;
985 	struct svc_req *rqstp;
986 	int error;
987 
988 	st = mem_alloc(sizeof(*st));
989 	st->st_xprt = NULL;
990 	STAILQ_INIT(&st->st_reqs);
991 	cv_init(&st->st_cond, "rpcsvc");
992 
993 	mtx_lock(&pool->sp_lock);
994 	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
995 
996 	/*
997 	 * If we are a new thread which was spawned to cope with
998 	 * increased load, set the state back to SVCPOOL_ACTIVE.
999 	 */
1000 	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1001 		pool->sp_state = SVCPOOL_ACTIVE;
1002 
1003 	while (pool->sp_state != SVCPOOL_CLOSING) {
1004 		/*
1005 		 * Check for idle transports once per second.
1006 		 */
1007 		if (time_uptime > pool->sp_lastidlecheck) {
1008 			pool->sp_lastidlecheck = time_uptime;
1009 			svc_checkidle(pool);
1010 		}
1011 
1012 		xprt = st->st_xprt;
1013 		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1014 			/*
1015 			 * Enforce maxthreads count.
1016 			 */
1017 			if (pool->sp_threadcount > pool->sp_maxthreads)
1018 				break;
1019 
1020 			/*
1021 			 * Before sleeping, see if we can find an
1022 			 * active transport which isn't being serviced
1023 			 * by a thread.
1024 			 */
1025 			if (svc_request_space_available(pool)) {
1026 				TAILQ_FOREACH(xprt, &pool->sp_active,
1027 				    xp_alink) {
1028 					if (!xprt->xp_thread) {
1029 						SVC_ACQUIRE(xprt);
1030 						xprt->xp_thread = st;
1031 						st->st_xprt = xprt;
1032 						break;
1033 					}
1034 				}
1035 			}
1036 			if (st->st_xprt)
1037 				continue;
1038 
1039 			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1040 			error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1041 				5 * hz);
1042 			LIST_REMOVE(st, st_ilink);
1043 
1044 			/*
1045 			 * Reduce worker thread count when idle.
1046 			 */
1047 			if (error == EWOULDBLOCK) {
1048 				if (!ismaster
1049 				    && (pool->sp_threadcount
1050 					> pool->sp_minthreads)
1051 					&& !st->st_xprt
1052 					&& STAILQ_EMPTY(&st->st_reqs))
1053 					break;
1054 			}
1055 			if (error == EWOULDBLOCK)
1056 				continue;
1057 			if (error) {
1058 				if (pool->sp_state != SVCPOOL_CLOSING) {
1059 					mtx_unlock(&pool->sp_lock);
1060 					svc_exit(pool);
1061 					mtx_lock(&pool->sp_lock);
1062 				}
1063 				break;
1064 			}
1065 
1066 			if (pool->sp_state == SVCPOOL_THREADWANTED) {
1067 				pool->sp_state = SVCPOOL_THREADSTARTING;
1068 				pool->sp_lastcreatetime = time_uptime;
1069 				mtx_unlock(&pool->sp_lock);
1070 				svc_new_thread(pool);
1071 				mtx_lock(&pool->sp_lock);
1072 			}
1073 			continue;
1074 		}
1075 
1076 		if (xprt) {
1077 			/*
1078 			 * Drain the transport socket and queue up any
1079 			 * RPCs.
1080 			 */
1081 			xprt->xp_lastactive = time_uptime;
1082 			stat = XPRT_IDLE;
1083 			do {
1084 				if (!svc_request_space_available(pool))
1085 					break;
1086 				rqstp = NULL;
1087 				mtx_unlock(&pool->sp_lock);
1088 				stat = svc_getreq(xprt, &rqstp);
1089 				mtx_lock(&pool->sp_lock);
1090 				if (rqstp) {
1091 					/*
1092 					 * See if the application has
1093 					 * a preference for some other
1094 					 * thread.
1095 					 */
1096 					stpref = st;
1097 					if (pool->sp_assign)
1098 						stpref = pool->sp_assign(st,
1099 						    rqstp);
1100 
1101 					pool->sp_space_used +=
1102 						rqstp->rq_size;
1103 					if (pool->sp_space_used
1104 					    > pool->sp_space_used_highest)
1105 						pool->sp_space_used_highest =
1106 							pool->sp_space_used;
1107 					rqstp->rq_thread = stpref;
1108 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1109 					    rqstp, rq_link);
1110 					stpref->st_reqcount++;
1111 
1112 					/*
1113 					 * If we assigned the request
1114 					 * to another thread, make
1115 					 * sure its awake and continue
1116 					 * reading from the
1117 					 * socket. Otherwise, try to
1118 					 * find some other thread to
1119 					 * read from the socket and
1120 					 * execute the request
1121 					 * immediately.
1122 					 */
1123 					if (stpref != st) {
1124 						cv_signal(&stpref->st_cond);
1125 						continue;
1126 					} else {
1127 						break;
1128 					}
1129 				}
1130 			} while (stat == XPRT_MOREREQS
1131 			    && pool->sp_state != SVCPOOL_CLOSING);
1132 
1133 			/*
1134 			 * Move this transport to the end of the
1135 			 * active list to ensure fairness when
1136 			 * multiple transports are active. If this was
1137 			 * the last queued request, svc_getreq will
1138 			 * end up calling xprt_inactive to remove from
1139 			 * the active list.
1140 			 */
1141 			xprt->xp_thread = NULL;
1142 			st->st_xprt = NULL;
1143 			if (xprt->xp_active) {
1144 				xprt_assignthread(xprt);
1145 				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1146 				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1147 				    xp_alink);
1148 			}
1149 			mtx_unlock(&pool->sp_lock);
1150 			SVC_RELEASE(xprt);
1151 			mtx_lock(&pool->sp_lock);
1152 		}
1153 
1154 		/*
1155 		 * Execute what we have queued.
1156 		 */
1157 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1158 			size_t sz = rqstp->rq_size;
1159 			mtx_unlock(&pool->sp_lock);
1160 			svc_executereq(rqstp);
1161 			mtx_lock(&pool->sp_lock);
1162 			pool->sp_space_used -= sz;
1163 		}
1164 	}
1165 
1166 	if (st->st_xprt) {
1167 		xprt = st->st_xprt;
1168 		st->st_xprt = NULL;
1169 		SVC_RELEASE(xprt);
1170 	}
1171 
1172 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1173 	LIST_REMOVE(st, st_link);
1174 	pool->sp_threadcount--;
1175 
1176 	mtx_unlock(&pool->sp_lock);
1177 
1178 	cv_destroy(&st->st_cond);
1179 	mem_free(st, sizeof(*st));
1180 
1181 	if (!ismaster)
1182 		wakeup(pool);
1183 }
1184 
1185 static void
1186 svc_thread_start(void *arg)
1187 {
1188 
1189 	svc_run_internal((SVCPOOL *) arg, FALSE);
1190 	kthread_exit();
1191 }
1192 
1193 static void
1194 svc_new_thread(SVCPOOL *pool)
1195 {
1196 	struct thread *td;
1197 
1198 	pool->sp_threadcount++;
1199 	kthread_add(svc_thread_start, pool,
1200 	    pool->sp_proc, &td, 0, 0,
1201 	    "%s: service", pool->sp_name);
1202 }
1203 
1204 void
1205 svc_run(SVCPOOL *pool)
1206 {
1207 	int i;
1208 	struct proc *p;
1209 	struct thread *td;
1210 
1211 	p = curproc;
1212 	td = curthread;
1213 	snprintf(td->td_name, sizeof(td->td_name),
1214 	    "%s: master", pool->sp_name);
1215 	pool->sp_state = SVCPOOL_ACTIVE;
1216 	pool->sp_proc = p;
1217 	pool->sp_lastcreatetime = time_uptime;
1218 	pool->sp_threadcount = 1;
1219 
1220 	for (i = 1; i < pool->sp_minthreads; i++) {
1221 		svc_new_thread(pool);
1222 	}
1223 
1224 	svc_run_internal(pool, TRUE);
1225 
1226 	mtx_lock(&pool->sp_lock);
1227 	while (pool->sp_threadcount > 0)
1228 		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1229 	mtx_unlock(&pool->sp_lock);
1230 }
1231 
1232 void
1233 svc_exit(SVCPOOL *pool)
1234 {
1235 	SVCTHREAD *st;
1236 
1237 	mtx_lock(&pool->sp_lock);
1238 
1239 	pool->sp_state = SVCPOOL_CLOSING;
1240 	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1241 		cv_signal(&st->st_cond);
1242 
1243 	mtx_unlock(&pool->sp_lock);
1244 }
1245 
1246 bool_t
1247 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1248 {
1249 	struct mbuf *m;
1250 	XDR xdrs;
1251 	bool_t stat;
1252 
1253 	m = rqstp->rq_args;
1254 	rqstp->rq_args = NULL;
1255 
1256 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1257 	stat = xargs(&xdrs, args);
1258 	XDR_DESTROY(&xdrs);
1259 
1260 	return (stat);
1261 }
1262 
1263 bool_t
1264 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1265 {
1266 	XDR xdrs;
1267 
1268 	if (rqstp->rq_addr) {
1269 		free(rqstp->rq_addr, M_SONAME);
1270 		rqstp->rq_addr = NULL;
1271 	}
1272 
1273 	xdrs.x_op = XDR_FREE;
1274 	return (xargs(&xdrs, args));
1275 }
1276 
1277 void
1278 svc_freereq(struct svc_req *rqstp)
1279 {
1280 	SVCTHREAD *st;
1281 	SVCXPRT *xprt;
1282 	SVCPOOL *pool;
1283 
1284 	st = rqstp->rq_thread;
1285 	xprt = rqstp->rq_xprt;
1286 	if (xprt)
1287 		pool = xprt->xp_pool;
1288 	else
1289 		pool = NULL;
1290 	if (st) {
1291 		mtx_lock(&pool->sp_lock);
1292 		KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1293 		    ("Freeing request out of order"));
1294 		STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1295 		st->st_reqcount--;
1296 		if (pool->sp_done)
1297 			pool->sp_done(st, rqstp);
1298 		mtx_unlock(&pool->sp_lock);
1299 	}
1300 
1301 	if (rqstp->rq_auth.svc_ah_ops)
1302 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1303 
1304 	if (rqstp->rq_xprt) {
1305 		SVC_RELEASE(rqstp->rq_xprt);
1306 	}
1307 
1308 	if (rqstp->rq_addr)
1309 		free(rqstp->rq_addr, M_SONAME);
1310 
1311 	if (rqstp->rq_args)
1312 		m_freem(rqstp->rq_args);
1313 
1314 	free(rqstp, M_RPC);
1315 }
1316