1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3 /*-
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33 #include <sys/cdefs.h>
34 /*
35 * svc.c, Server-side remote procedure call interface.
36 *
37 * There are two sets of procedures here. The xprt routines are
38 * for handling transport handles. The svc routines handle the
39 * list of service routines.
40 *
41 * Copyright (C) 1984, Sun Microsystems, Inc.
42 */
43
44 #include <sys/param.h>
45 #include <sys/jail.h>
46 #include <sys/lock.h>
47 #include <sys/kernel.h>
48 #include <sys/kthread.h>
49 #include <sys/malloc.h>
50 #include <sys/mbuf.h>
51 #include <sys/mutex.h>
52 #include <sys/proc.h>
53 #include <sys/protosw.h>
54 #include <sys/queue.h>
55 #include <sys/socketvar.h>
56 #include <sys/systm.h>
57 #include <sys/smp.h>
58 #include <sys/sx.h>
59 #include <sys/ucred.h>
60
61 #include <netinet/tcp.h>
62
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66
67 #include <rpc/rpc_com.h>
68
69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73 char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 static void svcpool_cleanup(SVCPOOL *pool);
79
80 /* *************** SVCXPRT related stuff **************** */
81
82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85
86 SVCPOOL*
svcpool_create(const char * name,struct sysctl_oid_list * sysctl_base)87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88 {
89 SVCPOOL *pool;
90 SVCGROUP *grp;
91 int g;
92
93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94
95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96 pool->sp_name = name;
97 pool->sp_state = SVCPOOL_INIT;
98 pool->sp_proc = NULL;
99 TAILQ_INIT(&pool->sp_callouts);
100 TAILQ_INIT(&pool->sp_lcallouts);
101 pool->sp_minthreads = 1;
102 pool->sp_maxthreads = 1;
103 pool->sp_groupcount = 1;
104 for (g = 0; g < SVC_MAXGROUPS; g++) {
105 grp = &pool->sp_groups[g];
106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107 grp->sg_pool = pool;
108 grp->sg_state = SVCPOOL_ACTIVE;
109 TAILQ_INIT(&grp->sg_xlist);
110 TAILQ_INIT(&grp->sg_active);
111 LIST_INIT(&grp->sg_idlethreads);
112 grp->sg_minthreads = 1;
113 grp->sg_maxthreads = 1;
114 }
115
116 /*
117 * Don't use more than a quarter of mbuf clusters. Nota bene:
118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119 * on LP64 architectures, so cast to u_long to avoid undefined
120 * behavior. (ILP32 architectures cannot have nmbclusters
121 * large enough to overflow for other reasons.)
122 */
123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125
126 sysctl_ctx_init(&pool->sp_sysctl);
127 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
130 pool, 0, svcpool_minthread_sysctl, "I",
131 "Minimal number of threads");
132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
134 pool, 0, svcpool_maxthread_sysctl, "I",
135 "Maximal number of threads");
136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
138 pool, 0, svcpool_threads_sysctl, "I",
139 "Current number of threads");
140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142 "Number of thread groups");
143
144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 "request_space_used", CTLFLAG_RD,
146 &pool->sp_space_used,
147 "Space in parsed but not handled requests.");
148
149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 "request_space_used_highest", CTLFLAG_RD,
151 &pool->sp_space_used_highest,
152 "Highest space used since reboot.");
153
154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 "request_space_high", CTLFLAG_RW,
156 &pool->sp_space_high,
157 "Maximum space in parsed but not handled requests.");
158
159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 "request_space_low", CTLFLAG_RW,
161 &pool->sp_space_low,
162 "Low water mark for request space.");
163
164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 "request_space_throttled", CTLFLAG_RD,
166 &pool->sp_space_throttled, 0,
167 "Whether nfs requests are currently throttled");
168
169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 "request_space_throttle_count", CTLFLAG_RD,
171 &pool->sp_space_throttle_count, 0,
172 "Count of times throttling based on request space has occurred");
173 }
174
175 return pool;
176 }
177
178 /*
179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180 * the pool data structures.
181 */
182 static void
svcpool_cleanup(SVCPOOL * pool)183 svcpool_cleanup(SVCPOOL *pool)
184 {
185 SVCGROUP *grp;
186 SVCXPRT *xprt, *nxprt;
187 struct svc_callout *s;
188 struct svc_loss_callout *sl;
189 struct svcxprt_list cleanup;
190 int g;
191
192 TAILQ_INIT(&cleanup);
193
194 for (g = 0; g < SVC_MAXGROUPS; g++) {
195 grp = &pool->sp_groups[g];
196 mtx_lock(&grp->sg_lock);
197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198 xprt_unregister_locked(xprt);
199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200 }
201 mtx_unlock(&grp->sg_lock);
202 }
203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204 if (xprt->xp_socket != NULL)
205 soshutdown(xprt->xp_socket, SHUT_WR);
206 SVC_RELEASE(xprt);
207 }
208
209 mtx_lock(&pool->sp_lock);
210 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
211 mtx_unlock(&pool->sp_lock);
212 svc_unreg(pool, s->sc_prog, s->sc_vers);
213 mtx_lock(&pool->sp_lock);
214 }
215 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
216 mtx_unlock(&pool->sp_lock);
217 svc_loss_unreg(pool, sl->slc_dispatch);
218 mtx_lock(&pool->sp_lock);
219 }
220 mtx_unlock(&pool->sp_lock);
221 }
222
223 void
svcpool_destroy(SVCPOOL * pool)224 svcpool_destroy(SVCPOOL *pool)
225 {
226 SVCGROUP *grp;
227 int g;
228
229 svcpool_cleanup(pool);
230
231 for (g = 0; g < SVC_MAXGROUPS; g++) {
232 grp = &pool->sp_groups[g];
233 mtx_destroy(&grp->sg_lock);
234 }
235 mtx_destroy(&pool->sp_lock);
236
237 if (pool->sp_rcache)
238 replay_freecache(pool->sp_rcache);
239
240 sysctl_ctx_free(&pool->sp_sysctl);
241 free(pool, M_RPC);
242 }
243
244 /*
245 * Similar to svcpool_destroy(), except that it does not destroy the actual
246 * data structures. As such, "pool" may be used again.
247 */
248 void
svcpool_close(SVCPOOL * pool)249 svcpool_close(SVCPOOL *pool)
250 {
251 SVCGROUP *grp;
252 int g;
253
254 svcpool_cleanup(pool);
255
256 /* Now, initialize the pool's state for a fresh svc_run() call. */
257 mtx_lock(&pool->sp_lock);
258 pool->sp_state = SVCPOOL_INIT;
259 mtx_unlock(&pool->sp_lock);
260 for (g = 0; g < SVC_MAXGROUPS; g++) {
261 grp = &pool->sp_groups[g];
262 mtx_lock(&grp->sg_lock);
263 grp->sg_state = SVCPOOL_ACTIVE;
264 mtx_unlock(&grp->sg_lock);
265 }
266 }
267
268 /*
269 * Sysctl handler to get the present thread count on a pool
270 */
271 static int
svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
273 {
274 SVCPOOL *pool;
275 int threads, error, g;
276
277 pool = oidp->oid_arg1;
278 threads = 0;
279 mtx_lock(&pool->sp_lock);
280 for (g = 0; g < pool->sp_groupcount; g++)
281 threads += pool->sp_groups[g].sg_threadcount;
282 mtx_unlock(&pool->sp_lock);
283 error = sysctl_handle_int(oidp, &threads, 0, req);
284 return (error);
285 }
286
287 /*
288 * Sysctl handler to set the minimum thread count on a pool
289 */
290 static int
svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
292 {
293 SVCPOOL *pool;
294 int newminthreads, error, g;
295
296 pool = oidp->oid_arg1;
297 newminthreads = pool->sp_minthreads;
298 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
299 if (error == 0 && newminthreads != pool->sp_minthreads) {
300 if (newminthreads > pool->sp_maxthreads)
301 return (EINVAL);
302 mtx_lock(&pool->sp_lock);
303 pool->sp_minthreads = newminthreads;
304 for (g = 0; g < pool->sp_groupcount; g++) {
305 pool->sp_groups[g].sg_minthreads = max(1,
306 pool->sp_minthreads / pool->sp_groupcount);
307 }
308 mtx_unlock(&pool->sp_lock);
309 }
310 return (error);
311 }
312
313 /*
314 * Sysctl handler to set the maximum thread count on a pool
315 */
316 static int
svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
318 {
319 SVCPOOL *pool;
320 int newmaxthreads, error, g;
321
322 pool = oidp->oid_arg1;
323 newmaxthreads = pool->sp_maxthreads;
324 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
325 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
326 if (newmaxthreads < pool->sp_minthreads)
327 return (EINVAL);
328 mtx_lock(&pool->sp_lock);
329 pool->sp_maxthreads = newmaxthreads;
330 for (g = 0; g < pool->sp_groupcount; g++) {
331 pool->sp_groups[g].sg_maxthreads = max(1,
332 pool->sp_maxthreads / pool->sp_groupcount);
333 }
334 mtx_unlock(&pool->sp_lock);
335 }
336 return (error);
337 }
338
339 /*
340 * Activate a transport handle.
341 */
342 void
xprt_register(SVCXPRT * xprt)343 xprt_register(SVCXPRT *xprt)
344 {
345 SVCPOOL *pool = xprt->xp_pool;
346 SVCGROUP *grp;
347 int g;
348
349 SVC_ACQUIRE(xprt);
350 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
351 xprt->xp_group = grp = &pool->sp_groups[g];
352 mtx_lock(&grp->sg_lock);
353 xprt->xp_registered = TRUE;
354 xprt->xp_active = FALSE;
355 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
356 mtx_unlock(&grp->sg_lock);
357 }
358
359 /*
360 * De-activate a transport handle. Note: the locked version doesn't
361 * release the transport - caller must do that after dropping the pool
362 * lock.
363 */
364 static void
xprt_unregister_locked(SVCXPRT * xprt)365 xprt_unregister_locked(SVCXPRT *xprt)
366 {
367 SVCGROUP *grp = xprt->xp_group;
368
369 mtx_assert(&grp->sg_lock, MA_OWNED);
370 KASSERT(xprt->xp_registered == TRUE,
371 ("xprt_unregister_locked: not registered"));
372 xprt_inactive_locked(xprt);
373 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
374 xprt->xp_registered = FALSE;
375 }
376
377 void
xprt_unregister(SVCXPRT * xprt)378 xprt_unregister(SVCXPRT *xprt)
379 {
380 SVCGROUP *grp = xprt->xp_group;
381
382 mtx_lock(&grp->sg_lock);
383 if (xprt->xp_registered == FALSE) {
384 /* Already unregistered by another thread */
385 mtx_unlock(&grp->sg_lock);
386 return;
387 }
388 xprt_unregister_locked(xprt);
389 mtx_unlock(&grp->sg_lock);
390
391 if (xprt->xp_socket != NULL)
392 soshutdown(xprt->xp_socket, SHUT_WR);
393 SVC_RELEASE(xprt);
394 }
395
396 /*
397 * Attempt to assign a service thread to this transport.
398 */
399 static int
xprt_assignthread(SVCXPRT * xprt)400 xprt_assignthread(SVCXPRT *xprt)
401 {
402 SVCGROUP *grp = xprt->xp_group;
403 SVCTHREAD *st;
404
405 mtx_assert(&grp->sg_lock, MA_OWNED);
406 st = LIST_FIRST(&grp->sg_idlethreads);
407 if (st) {
408 LIST_REMOVE(st, st_ilink);
409 SVC_ACQUIRE(xprt);
410 xprt->xp_thread = st;
411 st->st_xprt = xprt;
412 cv_signal(&st->st_cond);
413 return (TRUE);
414 } else {
415 /*
416 * See if we can create a new thread. The
417 * actual thread creation happens in
418 * svc_run_internal because our locking state
419 * is poorly defined (we are typically called
420 * from a socket upcall). Don't create more
421 * than one thread per second.
422 */
423 if (grp->sg_state == SVCPOOL_ACTIVE
424 && grp->sg_lastcreatetime < time_uptime
425 && grp->sg_threadcount < grp->sg_maxthreads) {
426 grp->sg_state = SVCPOOL_THREADWANTED;
427 }
428 }
429 return (FALSE);
430 }
431
432 void
xprt_active(SVCXPRT * xprt)433 xprt_active(SVCXPRT *xprt)
434 {
435 SVCGROUP *grp = xprt->xp_group;
436
437 mtx_lock(&grp->sg_lock);
438
439 if (!xprt->xp_registered) {
440 /*
441 * Race with xprt_unregister - we lose.
442 */
443 mtx_unlock(&grp->sg_lock);
444 return;
445 }
446
447 if (!xprt->xp_active) {
448 xprt->xp_active = TRUE;
449 if (xprt->xp_thread == NULL) {
450 if (!svc_request_space_available(xprt->xp_pool) ||
451 !xprt_assignthread(xprt))
452 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
453 xp_alink);
454 }
455 }
456
457 mtx_unlock(&grp->sg_lock);
458 }
459
460 void
xprt_inactive_locked(SVCXPRT * xprt)461 xprt_inactive_locked(SVCXPRT *xprt)
462 {
463 SVCGROUP *grp = xprt->xp_group;
464
465 mtx_assert(&grp->sg_lock, MA_OWNED);
466 if (xprt->xp_active) {
467 if (xprt->xp_thread == NULL)
468 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
469 xprt->xp_active = FALSE;
470 }
471 }
472
473 void
xprt_inactive(SVCXPRT * xprt)474 xprt_inactive(SVCXPRT *xprt)
475 {
476 SVCGROUP *grp = xprt->xp_group;
477
478 mtx_lock(&grp->sg_lock);
479 xprt_inactive_locked(xprt);
480 mtx_unlock(&grp->sg_lock);
481 }
482
483 /*
484 * Variant of xprt_inactive() for use only when sure that port is
485 * assigned to thread. For example, within receive handlers.
486 */
487 void
xprt_inactive_self(SVCXPRT * xprt)488 xprt_inactive_self(SVCXPRT *xprt)
489 {
490
491 KASSERT(xprt->xp_thread != NULL,
492 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
493 xprt->xp_active = FALSE;
494 }
495
496 /*
497 * Add a service program to the callout list.
498 * The dispatch routine will be called when a rpc request for this
499 * program number comes in.
500 */
501 bool_t
svc_reg(SVCXPRT * xprt,const rpcprog_t prog,const rpcvers_t vers,void (* dispatch)(struct svc_req *,SVCXPRT *),const struct netconfig * nconf)502 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
503 void (*dispatch)(struct svc_req *, SVCXPRT *),
504 const struct netconfig *nconf)
505 {
506 SVCPOOL *pool = xprt->xp_pool;
507 struct svc_callout *s;
508 char *netid = NULL;
509 int flag = 0;
510
511 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
512
513 if (xprt->xp_netid) {
514 netid = strdup(xprt->xp_netid, M_RPC);
515 flag = 1;
516 } else if (nconf && nconf->nc_netid) {
517 netid = strdup(nconf->nc_netid, M_RPC);
518 flag = 1;
519 } /* must have been created with svc_raw_create */
520 if ((netid == NULL) && (flag == 1)) {
521 return (FALSE);
522 }
523
524 mtx_lock(&pool->sp_lock);
525 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
526 if (netid)
527 free(netid, M_RPC);
528 if (s->sc_dispatch == dispatch)
529 goto rpcb_it; /* he is registering another xptr */
530 mtx_unlock(&pool->sp_lock);
531 return (FALSE);
532 }
533 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
534 if (s == NULL) {
535 if (netid)
536 free(netid, M_RPC);
537 mtx_unlock(&pool->sp_lock);
538 return (FALSE);
539 }
540
541 s->sc_prog = prog;
542 s->sc_vers = vers;
543 s->sc_dispatch = dispatch;
544 s->sc_netid = netid;
545 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
546
547 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
548 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
549
550 rpcb_it:
551 mtx_unlock(&pool->sp_lock);
552 /* now register the information with the local binder service */
553 if (nconf) {
554 bool_t dummy;
555 struct netconfig tnc;
556 struct netbuf nb;
557 tnc = *nconf;
558 nb.buf = &xprt->xp_ltaddr;
559 nb.len = xprt->xp_ltaddr.ss_len;
560 dummy = rpcb_set(prog, vers, &tnc, &nb);
561 return (dummy);
562 }
563 return (TRUE);
564 }
565
566 /*
567 * Remove a service program from the callout list.
568 */
569 void
svc_unreg(SVCPOOL * pool,const rpcprog_t prog,const rpcvers_t vers)570 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
571 {
572 struct svc_callout *s;
573
574 /* unregister the information anyway */
575 (void) rpcb_unset(prog, vers, NULL);
576 mtx_lock(&pool->sp_lock);
577 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
578 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
579 if (s->sc_netid)
580 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
581 mem_free(s, sizeof (struct svc_callout));
582 }
583 mtx_unlock(&pool->sp_lock);
584 }
585
586 /*
587 * Add a service connection loss program to the callout list.
588 * The dispatch routine will be called when some port in ths pool die.
589 */
590 bool_t
svc_loss_reg(SVCXPRT * xprt,void (* dispatch)(SVCXPRT *))591 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
592 {
593 SVCPOOL *pool = xprt->xp_pool;
594 struct svc_loss_callout *s;
595
596 mtx_lock(&pool->sp_lock);
597 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
598 if (s->slc_dispatch == dispatch)
599 break;
600 }
601 if (s != NULL) {
602 mtx_unlock(&pool->sp_lock);
603 return (TRUE);
604 }
605 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
606 if (s == NULL) {
607 mtx_unlock(&pool->sp_lock);
608 return (FALSE);
609 }
610 s->slc_dispatch = dispatch;
611 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
612 mtx_unlock(&pool->sp_lock);
613 return (TRUE);
614 }
615
616 /*
617 * Remove a service connection loss program from the callout list.
618 */
619 void
svc_loss_unreg(SVCPOOL * pool,void (* dispatch)(SVCXPRT *))620 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
621 {
622 struct svc_loss_callout *s;
623
624 mtx_lock(&pool->sp_lock);
625 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
626 if (s->slc_dispatch == dispatch) {
627 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
628 free(s, M_RPC);
629 break;
630 }
631 }
632 mtx_unlock(&pool->sp_lock);
633 }
634
635 /* ********************** CALLOUT list related stuff ************* */
636
637 /*
638 * Search the callout list for a program number, return the callout
639 * struct.
640 */
641 static struct svc_callout *
svc_find(SVCPOOL * pool,rpcprog_t prog,rpcvers_t vers,char * netid)642 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
643 {
644 struct svc_callout *s;
645
646 mtx_assert(&pool->sp_lock, MA_OWNED);
647 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
648 if (s->sc_prog == prog && s->sc_vers == vers
649 && (netid == NULL || s->sc_netid == NULL ||
650 strcmp(netid, s->sc_netid) == 0))
651 break;
652 }
653
654 return (s);
655 }
656
657 /* ******************* REPLY GENERATION ROUTINES ************ */
658
659 static bool_t
svc_sendreply_common(struct svc_req * rqstp,struct rpc_msg * rply,struct mbuf * body)660 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
661 struct mbuf *body)
662 {
663 SVCXPRT *xprt = rqstp->rq_xprt;
664 bool_t ok;
665
666 if (rqstp->rq_args) {
667 m_freem(rqstp->rq_args);
668 rqstp->rq_args = NULL;
669 }
670
671 if (xprt->xp_pool->sp_rcache)
672 replay_setreply(xprt->xp_pool->sp_rcache,
673 rply, svc_getrpccaller(rqstp), body);
674
675 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
676 return (FALSE);
677
678 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
679 if (rqstp->rq_addr) {
680 free(rqstp->rq_addr, M_SONAME);
681 rqstp->rq_addr = NULL;
682 }
683
684 return (ok);
685 }
686
687 /*
688 * Send a reply to an rpc request
689 */
690 bool_t
svc_sendreply(struct svc_req * rqstp,xdrproc_t xdr_results,void * xdr_location)691 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
692 {
693 struct rpc_msg rply;
694 struct mbuf *m;
695 XDR xdrs;
696 bool_t ok;
697
698 rply.rm_xid = rqstp->rq_xid;
699 rply.rm_direction = REPLY;
700 rply.rm_reply.rp_stat = MSG_ACCEPTED;
701 rply.acpted_rply.ar_verf = rqstp->rq_verf;
702 rply.acpted_rply.ar_stat = SUCCESS;
703 rply.acpted_rply.ar_results.where = NULL;
704 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
705
706 m = m_getcl(M_WAITOK, MT_DATA, 0);
707 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
708 ok = xdr_results(&xdrs, xdr_location);
709 XDR_DESTROY(&xdrs);
710
711 if (ok) {
712 return (svc_sendreply_common(rqstp, &rply, m));
713 } else {
714 m_freem(m);
715 return (FALSE);
716 }
717 }
718
719 bool_t
svc_sendreply_mbuf(struct svc_req * rqstp,struct mbuf * m)720 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
721 {
722 struct rpc_msg rply;
723
724 rply.rm_xid = rqstp->rq_xid;
725 rply.rm_direction = REPLY;
726 rply.rm_reply.rp_stat = MSG_ACCEPTED;
727 rply.acpted_rply.ar_verf = rqstp->rq_verf;
728 rply.acpted_rply.ar_stat = SUCCESS;
729 rply.acpted_rply.ar_results.where = NULL;
730 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
731
732 return (svc_sendreply_common(rqstp, &rply, m));
733 }
734
735 /*
736 * No procedure error reply
737 */
738 void
svcerr_noproc(struct svc_req * rqstp)739 svcerr_noproc(struct svc_req *rqstp)
740 {
741 SVCXPRT *xprt = rqstp->rq_xprt;
742 struct rpc_msg rply;
743
744 rply.rm_xid = rqstp->rq_xid;
745 rply.rm_direction = REPLY;
746 rply.rm_reply.rp_stat = MSG_ACCEPTED;
747 rply.acpted_rply.ar_verf = rqstp->rq_verf;
748 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
749
750 if (xprt->xp_pool->sp_rcache)
751 replay_setreply(xprt->xp_pool->sp_rcache,
752 &rply, svc_getrpccaller(rqstp), NULL);
753
754 svc_sendreply_common(rqstp, &rply, NULL);
755 }
756
757 /*
758 * Can't decode args error reply
759 */
760 void
svcerr_decode(struct svc_req * rqstp)761 svcerr_decode(struct svc_req *rqstp)
762 {
763 SVCXPRT *xprt = rqstp->rq_xprt;
764 struct rpc_msg rply;
765
766 rply.rm_xid = rqstp->rq_xid;
767 rply.rm_direction = REPLY;
768 rply.rm_reply.rp_stat = MSG_ACCEPTED;
769 rply.acpted_rply.ar_verf = rqstp->rq_verf;
770 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
771
772 if (xprt->xp_pool->sp_rcache)
773 replay_setreply(xprt->xp_pool->sp_rcache,
774 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
775
776 svc_sendreply_common(rqstp, &rply, NULL);
777 }
778
779 /*
780 * Some system error
781 */
782 void
svcerr_systemerr(struct svc_req * rqstp)783 svcerr_systemerr(struct svc_req *rqstp)
784 {
785 SVCXPRT *xprt = rqstp->rq_xprt;
786 struct rpc_msg rply;
787
788 rply.rm_xid = rqstp->rq_xid;
789 rply.rm_direction = REPLY;
790 rply.rm_reply.rp_stat = MSG_ACCEPTED;
791 rply.acpted_rply.ar_verf = rqstp->rq_verf;
792 rply.acpted_rply.ar_stat = SYSTEM_ERR;
793
794 if (xprt->xp_pool->sp_rcache)
795 replay_setreply(xprt->xp_pool->sp_rcache,
796 &rply, svc_getrpccaller(rqstp), NULL);
797
798 svc_sendreply_common(rqstp, &rply, NULL);
799 }
800
801 /*
802 * Authentication error reply
803 */
804 void
svcerr_auth(struct svc_req * rqstp,enum auth_stat why)805 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
806 {
807 SVCXPRT *xprt = rqstp->rq_xprt;
808 struct rpc_msg rply;
809
810 rply.rm_xid = rqstp->rq_xid;
811 rply.rm_direction = REPLY;
812 rply.rm_reply.rp_stat = MSG_DENIED;
813 rply.rjcted_rply.rj_stat = AUTH_ERROR;
814 rply.rjcted_rply.rj_why = why;
815
816 if (xprt->xp_pool->sp_rcache)
817 replay_setreply(xprt->xp_pool->sp_rcache,
818 &rply, svc_getrpccaller(rqstp), NULL);
819
820 svc_sendreply_common(rqstp, &rply, NULL);
821 }
822
823 /*
824 * Auth too weak error reply
825 */
826 void
svcerr_weakauth(struct svc_req * rqstp)827 svcerr_weakauth(struct svc_req *rqstp)
828 {
829
830 svcerr_auth(rqstp, AUTH_TOOWEAK);
831 }
832
833 /*
834 * Program unavailable error reply
835 */
836 void
svcerr_noprog(struct svc_req * rqstp)837 svcerr_noprog(struct svc_req *rqstp)
838 {
839 SVCXPRT *xprt = rqstp->rq_xprt;
840 struct rpc_msg rply;
841
842 rply.rm_xid = rqstp->rq_xid;
843 rply.rm_direction = REPLY;
844 rply.rm_reply.rp_stat = MSG_ACCEPTED;
845 rply.acpted_rply.ar_verf = rqstp->rq_verf;
846 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
847
848 if (xprt->xp_pool->sp_rcache)
849 replay_setreply(xprt->xp_pool->sp_rcache,
850 &rply, svc_getrpccaller(rqstp), NULL);
851
852 svc_sendreply_common(rqstp, &rply, NULL);
853 }
854
855 /*
856 * Program version mismatch error reply
857 */
858 void
svcerr_progvers(struct svc_req * rqstp,rpcvers_t low_vers,rpcvers_t high_vers)859 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
860 {
861 SVCXPRT *xprt = rqstp->rq_xprt;
862 struct rpc_msg rply;
863
864 rply.rm_xid = rqstp->rq_xid;
865 rply.rm_direction = REPLY;
866 rply.rm_reply.rp_stat = MSG_ACCEPTED;
867 rply.acpted_rply.ar_verf = rqstp->rq_verf;
868 rply.acpted_rply.ar_stat = PROG_MISMATCH;
869 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
870 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
871
872 if (xprt->xp_pool->sp_rcache)
873 replay_setreply(xprt->xp_pool->sp_rcache,
874 &rply, svc_getrpccaller(rqstp), NULL);
875
876 svc_sendreply_common(rqstp, &rply, NULL);
877 }
878
879 /*
880 * Allocate a new server transport structure. All fields are
881 * initialized to zero and xp_p3 is initialized to point at an
882 * extension structure to hold various flags and authentication
883 * parameters.
884 */
885 SVCXPRT *
svc_xprt_alloc(void)886 svc_xprt_alloc(void)
887 {
888 SVCXPRT *xprt;
889 SVCXPRT_EXT *ext;
890
891 xprt = mem_alloc(sizeof(SVCXPRT));
892 ext = mem_alloc(sizeof(SVCXPRT_EXT));
893 xprt->xp_p3 = ext;
894 refcount_init(&xprt->xp_refs, 1);
895
896 return (xprt);
897 }
898
899 /*
900 * Free a server transport structure.
901 */
902 void
svc_xprt_free(SVCXPRT * xprt)903 svc_xprt_free(SVCXPRT *xprt)
904 {
905
906 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
907 /* The size argument is ignored, so 0 is ok. */
908 mem_free(xprt->xp_gidp, 0);
909 mem_free(xprt, sizeof(SVCXPRT));
910 }
911
912 /* ******************* SERVER INPUT STUFF ******************* */
913
914 /*
915 * Read RPC requests from a transport and queue them to be
916 * executed. We handle authentication and replay cache replies here.
917 * Actually dispatching the RPC is deferred till svc_executereq.
918 */
919 static enum xprt_stat
svc_getreq(SVCXPRT * xprt,struct svc_req ** rqstp_ret)920 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
921 {
922 SVCPOOL *pool = xprt->xp_pool;
923 struct svc_req *r;
924 struct rpc_msg msg;
925 struct mbuf *args;
926 struct svc_loss_callout *s;
927 enum xprt_stat stat;
928
929 /* now receive msgs from xprtprt (support batch calls) */
930 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
931
932 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
933 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
934 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
935 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936 enum auth_stat why;
937
938 /*
939 * Handle replays and authenticate before queuing the
940 * request to be executed.
941 */
942 SVC_ACQUIRE(xprt);
943 r->rq_xprt = xprt;
944 if (pool->sp_rcache) {
945 struct rpc_msg repmsg;
946 struct mbuf *repbody;
947 enum replay_state rs;
948 rs = replay_find(pool->sp_rcache, &msg,
949 svc_getrpccaller(r), &repmsg, &repbody);
950 switch (rs) {
951 case RS_NEW:
952 break;
953 case RS_DONE:
954 SVC_REPLY(xprt, &repmsg, r->rq_addr,
955 repbody, &r->rq_reply_seq);
956 if (r->rq_addr) {
957 free(r->rq_addr, M_SONAME);
958 r->rq_addr = NULL;
959 }
960 m_freem(args);
961 goto call_done;
962
963 default:
964 m_freem(args);
965 goto call_done;
966 }
967 }
968
969 r->rq_xid = msg.rm_xid;
970 r->rq_prog = msg.rm_call.cb_prog;
971 r->rq_vers = msg.rm_call.cb_vers;
972 r->rq_proc = msg.rm_call.cb_proc;
973 r->rq_size = sizeof(*r) + m_length(args, NULL);
974 r->rq_args = args;
975 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
976 /*
977 * RPCSEC_GSS uses this return code
978 * for requests that form part of its
979 * context establishment protocol and
980 * should not be dispatched to the
981 * application.
982 */
983 if (why != RPCSEC_GSS_NODISPATCH)
984 svcerr_auth(r, why);
985 goto call_done;
986 }
987
988 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
989 svcerr_decode(r);
990 goto call_done;
991 }
992
993 /*
994 * Defer enabling DDP until the first non-NULLPROC RPC
995 * is received to allow STARTTLS authentication to
996 * enable TLS offload first.
997 */
998 if (xprt->xp_doneddp == 0 && r->rq_proc != NULLPROC &&
999 xprt->xp_socket != NULL &&
1000 atomic_cmpset_int(&xprt->xp_doneddp, 0, 1)) {
1001 if (xprt->xp_socket->so_proto->pr_protocol ==
1002 IPPROTO_TCP) {
1003 int optval = 1;
1004
1005 (void)so_setsockopt(xprt->xp_socket,
1006 IPPROTO_TCP, TCP_USE_DDP, &optval,
1007 sizeof(optval));
1008 }
1009 }
1010
1011 /*
1012 * Everything checks out, return request to caller.
1013 */
1014 *rqstp_ret = r;
1015 r = NULL;
1016 }
1017 call_done:
1018 if (r) {
1019 svc_freereq(r);
1020 r = NULL;
1021 }
1022 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1023 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1024 (*s->slc_dispatch)(xprt);
1025 xprt_unregister(xprt);
1026 }
1027
1028 return (stat);
1029 }
1030
1031 static void
svc_executereq(struct svc_req * rqstp)1032 svc_executereq(struct svc_req *rqstp)
1033 {
1034 SVCXPRT *xprt = rqstp->rq_xprt;
1035 SVCPOOL *pool = xprt->xp_pool;
1036 int prog_found;
1037 rpcvers_t low_vers;
1038 rpcvers_t high_vers;
1039 struct svc_callout *s;
1040
1041 /* now match message with a registered service*/
1042 prog_found = FALSE;
1043 low_vers = (rpcvers_t) -1L;
1044 high_vers = (rpcvers_t) 0L;
1045 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1046 if (s->sc_prog == rqstp->rq_prog) {
1047 if (s->sc_vers == rqstp->rq_vers) {
1048 /*
1049 * We hand ownership of r to the
1050 * dispatch method - they must call
1051 * svc_freereq.
1052 */
1053 (*s->sc_dispatch)(rqstp, xprt);
1054 return;
1055 } /* found correct version */
1056 prog_found = TRUE;
1057 if (s->sc_vers < low_vers)
1058 low_vers = s->sc_vers;
1059 if (s->sc_vers > high_vers)
1060 high_vers = s->sc_vers;
1061 } /* found correct program */
1062 }
1063
1064 /*
1065 * if we got here, the program or version
1066 * is not served ...
1067 */
1068 if (prog_found)
1069 svcerr_progvers(rqstp, low_vers, high_vers);
1070 else
1071 svcerr_noprog(rqstp);
1072
1073 svc_freereq(rqstp);
1074 }
1075
1076 static void
svc_checkidle(SVCGROUP * grp)1077 svc_checkidle(SVCGROUP *grp)
1078 {
1079 SVCXPRT *xprt, *nxprt;
1080 time_t timo;
1081 struct svcxprt_list cleanup;
1082
1083 TAILQ_INIT(&cleanup);
1084 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1085 /*
1086 * Only some transports have idle timers. Don't time
1087 * something out which is just waking up.
1088 */
1089 if (!xprt->xp_idletimeout || xprt->xp_thread)
1090 continue;
1091
1092 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1093 if (time_uptime > timo) {
1094 xprt_unregister_locked(xprt);
1095 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1096 }
1097 }
1098
1099 mtx_unlock(&grp->sg_lock);
1100 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1101 soshutdown(xprt->xp_socket, SHUT_WR);
1102 SVC_RELEASE(xprt);
1103 }
1104 mtx_lock(&grp->sg_lock);
1105 }
1106
1107 static void
svc_assign_waiting_sockets(SVCPOOL * pool)1108 svc_assign_waiting_sockets(SVCPOOL *pool)
1109 {
1110 SVCGROUP *grp;
1111 SVCXPRT *xprt;
1112 int g;
1113
1114 for (g = 0; g < pool->sp_groupcount; g++) {
1115 grp = &pool->sp_groups[g];
1116 mtx_lock(&grp->sg_lock);
1117 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1118 if (xprt_assignthread(xprt))
1119 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1120 else
1121 break;
1122 }
1123 mtx_unlock(&grp->sg_lock);
1124 }
1125 }
1126
1127 static void
svc_change_space_used(SVCPOOL * pool,long delta)1128 svc_change_space_used(SVCPOOL *pool, long delta)
1129 {
1130 unsigned long value;
1131
1132 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1133 if (delta > 0) {
1134 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1135 pool->sp_space_throttled = TRUE;
1136 pool->sp_space_throttle_count++;
1137 }
1138 if (value > pool->sp_space_used_highest)
1139 pool->sp_space_used_highest = value;
1140 } else {
1141 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1142 pool->sp_space_throttled = FALSE;
1143 svc_assign_waiting_sockets(pool);
1144 }
1145 }
1146 }
1147
1148 static bool_t
svc_request_space_available(SVCPOOL * pool)1149 svc_request_space_available(SVCPOOL *pool)
1150 {
1151
1152 if (pool->sp_space_throttled)
1153 return (FALSE);
1154 return (TRUE);
1155 }
1156
1157 static void
svc_run_internal(SVCGROUP * grp,bool_t ismaster)1158 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1159 {
1160 SVCPOOL *pool = grp->sg_pool;
1161 SVCTHREAD *st, *stpref;
1162 SVCXPRT *xprt;
1163 enum xprt_stat stat;
1164 struct svc_req *rqstp;
1165 struct proc *p;
1166 long sz;
1167 int error;
1168
1169 st = mem_alloc(sizeof(*st));
1170 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1171 st->st_pool = pool;
1172 st->st_xprt = NULL;
1173 STAILQ_INIT(&st->st_reqs);
1174 cv_init(&st->st_cond, "rpcsvc");
1175
1176 mtx_lock(&grp->sg_lock);
1177
1178 /*
1179 * If we are a new thread which was spawned to cope with
1180 * increased load, set the state back to SVCPOOL_ACTIVE.
1181 */
1182 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1183 grp->sg_state = SVCPOOL_ACTIVE;
1184
1185 while (grp->sg_state != SVCPOOL_CLOSING) {
1186 /*
1187 * Create new thread if requested.
1188 */
1189 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1190 grp->sg_state = SVCPOOL_THREADSTARTING;
1191 grp->sg_lastcreatetime = time_uptime;
1192 mtx_unlock(&grp->sg_lock);
1193 svc_new_thread(grp);
1194 mtx_lock(&grp->sg_lock);
1195 continue;
1196 }
1197
1198 /*
1199 * Check for idle transports once per second.
1200 */
1201 if (time_uptime > grp->sg_lastidlecheck) {
1202 grp->sg_lastidlecheck = time_uptime;
1203 svc_checkidle(grp);
1204 }
1205
1206 xprt = st->st_xprt;
1207 if (!xprt) {
1208 /*
1209 * Enforce maxthreads count.
1210 */
1211 if (!ismaster && grp->sg_threadcount >
1212 grp->sg_maxthreads)
1213 break;
1214
1215 /*
1216 * Before sleeping, see if we can find an
1217 * active transport which isn't being serviced
1218 * by a thread.
1219 */
1220 if (svc_request_space_available(pool) &&
1221 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1222 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1223 SVC_ACQUIRE(xprt);
1224 xprt->xp_thread = st;
1225 st->st_xprt = xprt;
1226 continue;
1227 }
1228
1229 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1230 if (ismaster || (!ismaster &&
1231 grp->sg_threadcount > grp->sg_minthreads))
1232 error = cv_timedwait_sig(&st->st_cond,
1233 &grp->sg_lock, 5 * hz);
1234 else
1235 error = cv_wait_sig(&st->st_cond,
1236 &grp->sg_lock);
1237 if (st->st_xprt == NULL)
1238 LIST_REMOVE(st, st_ilink);
1239
1240 /*
1241 * Reduce worker thread count when idle.
1242 */
1243 if (error == EWOULDBLOCK) {
1244 if (!ismaster
1245 && (grp->sg_threadcount
1246 > grp->sg_minthreads)
1247 && !st->st_xprt)
1248 break;
1249 } else if (error != 0) {
1250 KASSERT(error == EINTR || error == ERESTART,
1251 ("non-signal error %d", error));
1252 mtx_unlock(&grp->sg_lock);
1253 p = curproc;
1254 PROC_LOCK(p);
1255 if (P_SHOULDSTOP(p) ||
1256 (p->p_flag & P_TOTAL_STOP) != 0) {
1257 thread_suspend_check(0);
1258 PROC_UNLOCK(p);
1259 mtx_lock(&grp->sg_lock);
1260 } else {
1261 PROC_UNLOCK(p);
1262 svc_exit(pool);
1263 mtx_lock(&grp->sg_lock);
1264 break;
1265 }
1266 }
1267 continue;
1268 }
1269 mtx_unlock(&grp->sg_lock);
1270
1271 /*
1272 * Drain the transport socket and queue up any RPCs.
1273 */
1274 xprt->xp_lastactive = time_uptime;
1275 do {
1276 if (!svc_request_space_available(pool))
1277 break;
1278 rqstp = NULL;
1279 stat = svc_getreq(xprt, &rqstp);
1280 if (rqstp) {
1281 svc_change_space_used(pool, rqstp->rq_size);
1282 /*
1283 * See if the application has a preference
1284 * for some other thread.
1285 */
1286 if (pool->sp_assign) {
1287 stpref = pool->sp_assign(st, rqstp);
1288 rqstp->rq_thread = stpref;
1289 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1290 rqstp, rq_link);
1291 mtx_unlock(&stpref->st_lock);
1292 if (stpref != st)
1293 rqstp = NULL;
1294 } else {
1295 rqstp->rq_thread = st;
1296 STAILQ_INSERT_TAIL(&st->st_reqs,
1297 rqstp, rq_link);
1298 }
1299 }
1300 } while (rqstp == NULL && stat == XPRT_MOREREQS
1301 && grp->sg_state != SVCPOOL_CLOSING);
1302
1303 /*
1304 * Move this transport to the end of the active list to
1305 * ensure fairness when multiple transports are active.
1306 * If this was the last queued request, svc_getreq will end
1307 * up calling xprt_inactive to remove from the active list.
1308 */
1309 mtx_lock(&grp->sg_lock);
1310 xprt->xp_thread = NULL;
1311 st->st_xprt = NULL;
1312 if (xprt->xp_active) {
1313 if (!svc_request_space_available(pool) ||
1314 !xprt_assignthread(xprt))
1315 TAILQ_INSERT_TAIL(&grp->sg_active,
1316 xprt, xp_alink);
1317 }
1318 mtx_unlock(&grp->sg_lock);
1319 SVC_RELEASE(xprt);
1320
1321 /*
1322 * Execute what we have queued.
1323 */
1324 mtx_lock(&st->st_lock);
1325 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1326 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1327 mtx_unlock(&st->st_lock);
1328 sz = (long)rqstp->rq_size;
1329 svc_executereq(rqstp);
1330 svc_change_space_used(pool, -sz);
1331 mtx_lock(&st->st_lock);
1332 }
1333 mtx_unlock(&st->st_lock);
1334 mtx_lock(&grp->sg_lock);
1335 }
1336
1337 if (st->st_xprt) {
1338 xprt = st->st_xprt;
1339 st->st_xprt = NULL;
1340 SVC_RELEASE(xprt);
1341 }
1342 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1343 mtx_destroy(&st->st_lock);
1344 cv_destroy(&st->st_cond);
1345 mem_free(st, sizeof(*st));
1346
1347 grp->sg_threadcount--;
1348 if (!ismaster)
1349 wakeup(grp);
1350 mtx_unlock(&grp->sg_lock);
1351 }
1352
1353 static void
svc_thread_start(void * arg)1354 svc_thread_start(void *arg)
1355 {
1356
1357 svc_run_internal((SVCGROUP *) arg, FALSE);
1358 kthread_exit();
1359 }
1360
1361 static void
svc_new_thread(SVCGROUP * grp)1362 svc_new_thread(SVCGROUP *grp)
1363 {
1364 SVCPOOL *pool = grp->sg_pool;
1365 struct thread *td;
1366
1367 mtx_lock(&grp->sg_lock);
1368 grp->sg_threadcount++;
1369 mtx_unlock(&grp->sg_lock);
1370 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1371 "%s: service", pool->sp_name);
1372 }
1373
1374 void
svc_run(SVCPOOL * pool)1375 svc_run(SVCPOOL *pool)
1376 {
1377 int g, i;
1378 struct proc *p;
1379 struct thread *td;
1380 SVCGROUP *grp;
1381
1382 p = curproc;
1383 td = curthread;
1384 snprintf(td->td_name, sizeof(td->td_name),
1385 "%s: master", pool->sp_name);
1386 pool->sp_state = SVCPOOL_ACTIVE;
1387 pool->sp_proc = p;
1388
1389 /* Choose group count based on number of threads and CPUs. */
1390 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1391 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1392 for (g = 0; g < pool->sp_groupcount; g++) {
1393 grp = &pool->sp_groups[g];
1394 grp->sg_minthreads = max(1,
1395 pool->sp_minthreads / pool->sp_groupcount);
1396 grp->sg_maxthreads = max(1,
1397 pool->sp_maxthreads / pool->sp_groupcount);
1398 grp->sg_lastcreatetime = time_uptime;
1399 }
1400
1401 /* Starting threads */
1402 pool->sp_groups[0].sg_threadcount++;
1403 for (g = 0; g < pool->sp_groupcount; g++) {
1404 grp = &pool->sp_groups[g];
1405 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1406 svc_new_thread(grp);
1407 }
1408 svc_run_internal(&pool->sp_groups[0], TRUE);
1409
1410 /* Waiting for threads to stop. */
1411 for (g = 0; g < pool->sp_groupcount; g++) {
1412 grp = &pool->sp_groups[g];
1413 mtx_lock(&grp->sg_lock);
1414 while (grp->sg_threadcount > 0)
1415 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1416 mtx_unlock(&grp->sg_lock);
1417 }
1418 }
1419
1420 void
svc_exit(SVCPOOL * pool)1421 svc_exit(SVCPOOL *pool)
1422 {
1423 SVCGROUP *grp;
1424 SVCTHREAD *st;
1425 int g;
1426
1427 pool->sp_state = SVCPOOL_CLOSING;
1428 for (g = 0; g < pool->sp_groupcount; g++) {
1429 grp = &pool->sp_groups[g];
1430 mtx_lock(&grp->sg_lock);
1431 if (grp->sg_state != SVCPOOL_CLOSING) {
1432 grp->sg_state = SVCPOOL_CLOSING;
1433 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1434 cv_signal(&st->st_cond);
1435 }
1436 mtx_unlock(&grp->sg_lock);
1437 }
1438 }
1439
1440 bool_t
svc_getargs(struct svc_req * rqstp,xdrproc_t xargs,void * args)1441 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1442 {
1443 struct mbuf *m;
1444 XDR xdrs;
1445 bool_t stat;
1446
1447 m = rqstp->rq_args;
1448 rqstp->rq_args = NULL;
1449
1450 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1451 stat = xargs(&xdrs, args);
1452 XDR_DESTROY(&xdrs);
1453
1454 return (stat);
1455 }
1456
1457 bool_t
svc_freeargs(struct svc_req * rqstp,xdrproc_t xargs,void * args)1458 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1459 {
1460 XDR xdrs;
1461
1462 if (rqstp->rq_addr) {
1463 free(rqstp->rq_addr, M_SONAME);
1464 rqstp->rq_addr = NULL;
1465 }
1466
1467 xdrs.x_op = XDR_FREE;
1468 return (xargs(&xdrs, args));
1469 }
1470
1471 void
svc_freereq(struct svc_req * rqstp)1472 svc_freereq(struct svc_req *rqstp)
1473 {
1474 SVCTHREAD *st;
1475 SVCPOOL *pool;
1476
1477 st = rqstp->rq_thread;
1478 if (st) {
1479 pool = st->st_pool;
1480 if (pool->sp_done)
1481 pool->sp_done(st, rqstp);
1482 }
1483
1484 if (rqstp->rq_auth.svc_ah_ops)
1485 SVCAUTH_RELEASE(&rqstp->rq_auth);
1486
1487 if (rqstp->rq_xprt) {
1488 SVC_RELEASE(rqstp->rq_xprt);
1489 }
1490
1491 if (rqstp->rq_addr)
1492 free(rqstp->rq_addr, M_SONAME);
1493
1494 if (rqstp->rq_args)
1495 m_freem(rqstp->rq_args);
1496
1497 free(rqstp, M_RPC);
1498 }
1499