1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3 /*-
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33 #include <sys/cdefs.h>
34 /*
35 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
36 *
37 * Copyright (C) 1984, Sun Microsystems, Inc.
38 *
39 * TCP based RPC supports 'batched calls'.
40 * A sequence of calls may be batched-up in a send buffer. The rpc call
41 * return immediately to the client even though the call was not necessarily
42 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
43 * the rpc timeout value is zero (see clnt.h, rpc).
44 *
45 * Clients should NOT casually batch calls that in fact return results; that is,
46 * the server side should be aware that a call is batched and not produce any
47 * return message. Batched calls that produce many result messages can
48 * deadlock (netlock) the client and the server....
49 *
50 * Now go hang yourself.
51 */
52
53 #include "opt_kern_tls.h"
54
55 #include <sys/param.h>
56 #include <sys/systm.h>
57 #include <sys/kernel.h>
58 #include <sys/kthread.h>
59 #include <sys/ktls.h>
60 #include <sys/lock.h>
61 #include <sys/malloc.h>
62 #include <sys/mbuf.h>
63 #include <sys/mutex.h>
64 #include <sys/pcpu.h>
65 #include <sys/proc.h>
66 #include <sys/protosw.h>
67 #include <sys/socket.h>
68 #include <sys/socketvar.h>
69 #include <sys/sx.h>
70 #include <sys/syslog.h>
71 #include <sys/time.h>
72 #include <sys/uio.h>
73
74 #include <net/vnet.h>
75
76 #include <netinet/tcp.h>
77
78 #include <rpc/rpc.h>
79 #include <rpc/rpc_com.h>
80 #include <rpc/krpc.h>
81 #include <rpc/rpcsec_tls.h>
82
83 struct cmessage {
84 struct cmsghdr cmsg;
85 struct cmsgcred cmcred;
86 };
87
88 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
89 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
90 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
91 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
92 static void clnt_vc_abort(CLIENT *);
93 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
94 static void clnt_vc_close(CLIENT *);
95 static void clnt_vc_destroy(CLIENT *);
96 static bool_t time_not_ok(struct timeval *);
97 static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
98 static void clnt_vc_dotlsupcall(void *data);
99
100 static const struct clnt_ops clnt_vc_ops = {
101 .cl_call = clnt_vc_call,
102 .cl_abort = clnt_vc_abort,
103 .cl_geterr = clnt_vc_geterr,
104 .cl_freeres = clnt_vc_freeres,
105 .cl_close = clnt_vc_close,
106 .cl_destroy = clnt_vc_destroy,
107 .cl_control = clnt_vc_control
108 };
109
110 static void clnt_vc_upcallsdone(struct ct_data *);
111
112 /*
113 * Create a client handle for a connection.
114 * Default options are set, which the user can change using clnt_control()'s.
115 * The rpc/vc package does buffering similar to stdio, so the client
116 * must pick send and receive buffer sizes, 0 => use the default.
117 * NB: fd is copied into a private area.
118 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
119 * set this something more useful.
120 *
121 * fd should be an open socket
122 */
123 CLIENT *
clnt_vc_create(struct socket * so,struct sockaddr * raddr,const rpcprog_t prog,const rpcvers_t vers,size_t sendsz,size_t recvsz,int intrflag)124 clnt_vc_create(
125 struct socket *so, /* open file descriptor */
126 struct sockaddr *raddr, /* servers address */
127 const rpcprog_t prog, /* program number */
128 const rpcvers_t vers, /* version number */
129 size_t sendsz, /* buffer recv size */
130 size_t recvsz, /* buffer send size */
131 int intrflag) /* interruptible */
132 {
133 CLIENT *cl; /* client handle */
134 struct ct_data *ct = NULL; /* client handle */
135 struct timeval now;
136 struct rpc_msg call_msg;
137 static uint32_t disrupt;
138 struct __rpc_sockinfo si;
139 XDR xdrs;
140 int error, interrupted, one = 1, sleep_flag;
141 struct sockopt sopt;
142
143 KASSERT(raddr->sa_family != AF_LOCAL,
144 ("%s: kernel RPC over unix(4) not supported", __func__));
145
146 if (disrupt == 0)
147 disrupt = (uint32_t)(long)raddr;
148
149 cl = (CLIENT *)mem_alloc(sizeof (*cl));
150 ct = (struct ct_data *)mem_alloc(sizeof (*ct));
151
152 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
153 ct->ct_threads = 0;
154 ct->ct_closing = FALSE;
155 ct->ct_closed = FALSE;
156 ct->ct_upcallrefs = 0;
157 ct->ct_rcvstate = RPCRCVSTATE_NORMAL;
158
159 if ((so->so_state & SS_ISCONNECTED) == 0) {
160 error = soconnect(so, raddr, curthread);
161 SOCK_LOCK(so);
162 interrupted = 0;
163 sleep_flag = PSOCK;
164 if (intrflag != 0)
165 sleep_flag |= PCATCH;
166 while ((so->so_state & SS_ISCONNECTING)
167 && so->so_error == 0) {
168 error = msleep(&so->so_timeo, SOCK_MTX(so),
169 sleep_flag, "connec", 0);
170 if (error) {
171 if (error == EINTR || error == ERESTART)
172 interrupted = 1;
173 break;
174 }
175 }
176 if (error == 0) {
177 error = so->so_error;
178 so->so_error = 0;
179 }
180 SOCK_UNLOCK(so);
181 if (error) {
182 if (!interrupted)
183 so->so_state &= ~SS_ISCONNECTING;
184 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
185 rpc_createerr.cf_error.re_errno = error;
186 goto err;
187 }
188 }
189
190 if (!__rpc_socket2sockinfo(so, &si)) {
191 goto err;
192 }
193
194 if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
195 bzero(&sopt, sizeof(sopt));
196 sopt.sopt_dir = SOPT_SET;
197 sopt.sopt_level = SOL_SOCKET;
198 sopt.sopt_name = SO_KEEPALIVE;
199 sopt.sopt_val = &one;
200 sopt.sopt_valsize = sizeof(one);
201 sosetopt(so, &sopt);
202 }
203
204 if (so->so_proto->pr_protocol == IPPROTO_TCP) {
205 bzero(&sopt, sizeof(sopt));
206 sopt.sopt_dir = SOPT_SET;
207 sopt.sopt_level = IPPROTO_TCP;
208 sopt.sopt_name = TCP_NODELAY;
209 sopt.sopt_val = &one;
210 sopt.sopt_valsize = sizeof(one);
211 sosetopt(so, &sopt);
212 }
213
214 ct->ct_closeit = FALSE;
215
216 /*
217 * Set up private data struct
218 */
219 ct->ct_socket = so;
220 ct->ct_wait.tv_sec = -1;
221 ct->ct_wait.tv_usec = -1;
222 memcpy(&ct->ct_addr, raddr, raddr->sa_len);
223
224 /*
225 * Initialize call message
226 */
227 getmicrotime(&now);
228 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
229 call_msg.rm_xid = ct->ct_xid;
230 call_msg.rm_direction = CALL;
231 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
232 call_msg.rm_call.cb_prog = (uint32_t)prog;
233 call_msg.rm_call.cb_vers = (uint32_t)vers;
234
235 /*
236 * pre-serialize the static part of the call msg and stash it away
237 */
238 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
239 XDR_ENCODE);
240 if (! xdr_callhdr(&xdrs, &call_msg))
241 goto err;
242 ct->ct_mpos = XDR_GETPOS(&xdrs);
243 XDR_DESTROY(&xdrs);
244 ct->ct_waitchan = "rpcrecv";
245 ct->ct_waitflag = 0;
246
247 /*
248 * Create a client handle which uses xdrrec for serialization
249 * and authnone for authentication.
250 */
251 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
252 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
253 error = soreserve(ct->ct_socket, sendsz, recvsz);
254 if (error != 0)
255 goto err;
256 cl->cl_refs = 1;
257 cl->cl_ops = &clnt_vc_ops;
258 cl->cl_private = ct;
259 cl->cl_auth = authnone_create();
260
261 SOCK_RECVBUF_LOCK(ct->ct_socket);
262 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
263 SOCK_RECVBUF_UNLOCK(ct->ct_socket);
264
265 ct->ct_raw = NULL;
266 ct->ct_record = NULL;
267 ct->ct_record_resid = 0;
268 ct->ct_tlsstate = RPCTLS_NONE;
269 TAILQ_INIT(&ct->ct_pending);
270 return (cl);
271
272 err:
273 mtx_destroy(&ct->ct_lock);
274 mem_free(ct, sizeof (struct ct_data));
275 mem_free(cl, sizeof (CLIENT));
276
277 return ((CLIENT *)NULL);
278 }
279
280 static enum clnt_stat
clnt_vc_call(CLIENT * cl,struct rpc_callextra * ext,rpcproc_t proc,struct mbuf * args,struct mbuf ** resultsp,struct timeval utimeout)281 clnt_vc_call(
282 CLIENT *cl, /* client handle */
283 struct rpc_callextra *ext, /* call metadata */
284 rpcproc_t proc, /* procedure number */
285 struct mbuf *args, /* pointer to args */
286 struct mbuf **resultsp, /* pointer to results */
287 struct timeval utimeout)
288 {
289 struct ct_data *ct = (struct ct_data *) cl->cl_private;
290 AUTH *auth;
291 struct rpc_err *errp;
292 enum clnt_stat stat;
293 XDR xdrs;
294 struct rpc_msg reply_msg;
295 bool_t ok;
296 int nrefreshes = 2; /* number of times to refresh cred */
297 struct timeval timeout;
298 uint32_t xid;
299 struct mbuf *mreq = NULL, *results;
300 struct ct_request *cr;
301 int error, maxextsiz, trycnt;
302 #ifdef KERN_TLS
303 u_int maxlen;
304 #endif
305
306 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
307
308 mtx_lock(&ct->ct_lock);
309
310 if (ct->ct_closing || ct->ct_closed) {
311 mtx_unlock(&ct->ct_lock);
312 free(cr, M_RPC);
313 return (RPC_CANTSEND);
314 }
315 ct->ct_threads++;
316
317 if (ext) {
318 auth = ext->rc_auth;
319 errp = &ext->rc_err;
320 } else {
321 auth = cl->cl_auth;
322 errp = &ct->ct_error;
323 }
324
325 cr->cr_mrep = NULL;
326 cr->cr_error = 0;
327
328 if (ct->ct_wait.tv_usec == -1) {
329 timeout = utimeout; /* use supplied timeout */
330 } else {
331 timeout = ct->ct_wait; /* use default timeout */
332 }
333
334 /*
335 * After 15sec of looping, allow it to return RPC_CANTSEND, which will
336 * cause the clnt_reconnect layer to create a new TCP connection.
337 */
338 trycnt = 15 * hz;
339 call_again:
340 mtx_assert(&ct->ct_lock, MA_OWNED);
341 if (ct->ct_closing || ct->ct_closed) {
342 ct->ct_threads--;
343 wakeup(ct);
344 mtx_unlock(&ct->ct_lock);
345 free(cr, M_RPC);
346 return (RPC_CANTSEND);
347 }
348
349 ct->ct_xid++;
350 xid = ct->ct_xid;
351
352 mtx_unlock(&ct->ct_lock);
353
354 /*
355 * Leave space to pre-pend the record mark.
356 */
357 mreq = m_gethdr(M_WAITOK, MT_DATA);
358 mreq->m_data += sizeof(uint32_t);
359 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
360 ("RPC header too big"));
361 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
362 mreq->m_len = ct->ct_mpos;
363
364 /*
365 * The XID is the first thing in the request.
366 */
367 *mtod(mreq, uint32_t *) = htonl(xid);
368
369 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
370
371 errp->re_status = stat = RPC_SUCCESS;
372
373 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
374 (! AUTH_MARSHALL(auth, xid, &xdrs,
375 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
376 errp->re_status = stat = RPC_CANTENCODEARGS;
377 mtx_lock(&ct->ct_lock);
378 goto out;
379 }
380 mreq->m_pkthdr.len = m_length(mreq, NULL);
381
382 /*
383 * Prepend a record marker containing the packet length.
384 */
385 M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK);
386 *mtod(mreq, uint32_t *) =
387 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
388
389 cr->cr_xid = xid;
390 mtx_lock(&ct->ct_lock);
391 /*
392 * Check to see if the other end has already started to close down
393 * the connection. The upcall will have set ct_error.re_status
394 * to RPC_CANTRECV if this is the case.
395 * If the other end starts to close down the connection after this
396 * point, it will be detected later when cr_error is checked,
397 * since the request is in the ct_pending queue.
398 */
399 if (ct->ct_error.re_status == RPC_CANTRECV) {
400 if (errp != &ct->ct_error) {
401 errp->re_errno = ct->ct_error.re_errno;
402 errp->re_status = RPC_CANTRECV;
403 }
404 stat = RPC_CANTRECV;
405 goto out;
406 }
407
408 /* For TLS, wait for an upcall to be done, as required. */
409 while ((ct->ct_rcvstate & (RPCRCVSTATE_NORMAL |
410 RPCRCVSTATE_NONAPPDATA)) == 0)
411 msleep(&ct->ct_rcvstate, &ct->ct_lock, 0, "rpcrcvst", hz);
412
413 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
414 mtx_unlock(&ct->ct_lock);
415
416 if (ct->ct_tlsstate > RPCTLS_NONE) {
417 /*
418 * Copy the mbuf chain to a chain of ext_pgs mbuf(s)
419 * as required by KERN_TLS.
420 */
421 maxextsiz = TLS_MAX_MSG_SIZE_V10_2;
422 #ifdef KERN_TLS
423 if (rpctls_getinfo(&maxlen, false, false))
424 maxextsiz = min(maxextsiz, maxlen);
425 #endif
426 mreq = _rpc_copym_into_ext_pgs(mreq, maxextsiz);
427 }
428 /*
429 * sosend consumes mreq.
430 */
431 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
432 mreq = NULL;
433 if (error == EMSGSIZE || (error == ERESTART &&
434 (ct->ct_waitflag & PCATCH) == 0 && trycnt-- > 0)) {
435 SOCK_SENDBUF_LOCK(ct->ct_socket);
436 sbwait(ct->ct_socket, SO_SND);
437 SOCK_SENDBUF_UNLOCK(ct->ct_socket);
438 AUTH_VALIDATE(auth, xid, NULL, NULL);
439 mtx_lock(&ct->ct_lock);
440 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
441 /* Sleep for 1 clock tick before trying the sosend() again. */
442 mtx_unlock(&ct->ct_lock);
443 pause("rpclpsnd", 1);
444 mtx_lock(&ct->ct_lock);
445 goto call_again;
446 }
447
448 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
449 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
450 reply_msg.acpted_rply.ar_verf.oa_length = 0;
451 reply_msg.acpted_rply.ar_results.where = NULL;
452 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
453
454 mtx_lock(&ct->ct_lock);
455 if (error) {
456 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
457 errp->re_errno = error;
458 errp->re_status = stat = RPC_CANTSEND;
459 goto out;
460 }
461
462 /*
463 * Check to see if we got an upcall while waiting for the
464 * lock. In both these cases, the request has been removed
465 * from ct->ct_pending.
466 */
467 if (cr->cr_error) {
468 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
469 errp->re_errno = cr->cr_error;
470 errp->re_status = stat = RPC_CANTRECV;
471 goto out;
472 }
473 if (cr->cr_mrep) {
474 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
475 goto got_reply;
476 }
477
478 /*
479 * Hack to provide rpc-based message passing
480 */
481 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
482 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
483 errp->re_status = stat = RPC_TIMEDOUT;
484 goto out;
485 }
486
487 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
488 tvtohz(&timeout));
489
490 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
491
492 if (error) {
493 /*
494 * The sleep returned an error so our request is still
495 * on the list. Turn the error code into an
496 * appropriate client status.
497 */
498 errp->re_errno = error;
499 switch (error) {
500 case EINTR:
501 stat = RPC_INTR;
502 break;
503 case EWOULDBLOCK:
504 stat = RPC_TIMEDOUT;
505 break;
506 default:
507 stat = RPC_CANTRECV;
508 }
509 errp->re_status = stat;
510 goto out;
511 } else {
512 /*
513 * We were woken up by the upcall. If the
514 * upcall had a receive error, report that,
515 * otherwise we have a reply.
516 */
517 if (cr->cr_error) {
518 errp->re_errno = cr->cr_error;
519 errp->re_status = stat = RPC_CANTRECV;
520 goto out;
521 }
522 }
523
524 got_reply:
525 /*
526 * Now decode and validate the response. We need to drop the
527 * lock since xdr_replymsg may end up sleeping in malloc.
528 */
529 mtx_unlock(&ct->ct_lock);
530
531 if (ext && ext->rc_feedback)
532 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
533
534 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
535 ok = xdr_replymsg(&xdrs, &reply_msg);
536 cr->cr_mrep = NULL;
537
538 if (ok) {
539 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
540 (reply_msg.acpted_rply.ar_stat == SUCCESS))
541 errp->re_status = stat = RPC_SUCCESS;
542 else
543 stat = _seterr_reply(&reply_msg, errp);
544
545 if (stat == RPC_SUCCESS) {
546 results = xdrmbuf_getall(&xdrs);
547 if (!AUTH_VALIDATE(auth, xid,
548 &reply_msg.acpted_rply.ar_verf,
549 &results)) {
550 errp->re_status = stat = RPC_AUTHERROR;
551 errp->re_why = AUTH_INVALIDRESP;
552 } else {
553 KASSERT(results,
554 ("auth validated but no result"));
555 *resultsp = results;
556 }
557 } /* end successful completion */
558 /*
559 * If unsuccessful AND error is an authentication error
560 * then refresh credentials and try again, else break
561 */
562 else if (stat == RPC_AUTHERROR)
563 /* maybe our credentials need to be refreshed ... */
564 if (nrefreshes > 0 &&
565 AUTH_REFRESH(auth, &reply_msg)) {
566 nrefreshes--;
567 XDR_DESTROY(&xdrs);
568 mtx_lock(&ct->ct_lock);
569 goto call_again;
570 }
571 /* end of unsuccessful completion */
572 } /* end of valid reply message */
573 else {
574 errp->re_status = stat = RPC_CANTDECODERES;
575 }
576 XDR_DESTROY(&xdrs);
577 mtx_lock(&ct->ct_lock);
578 out:
579 mtx_assert(&ct->ct_lock, MA_OWNED);
580
581 KASSERT(stat != RPC_SUCCESS || *resultsp,
582 ("RPC_SUCCESS without reply"));
583
584 if (mreq)
585 m_freem(mreq);
586 if (cr->cr_mrep)
587 m_freem(cr->cr_mrep);
588
589 ct->ct_threads--;
590 if (ct->ct_closing)
591 wakeup(ct);
592
593 mtx_unlock(&ct->ct_lock);
594
595 if (auth && stat != RPC_SUCCESS)
596 AUTH_VALIDATE(auth, xid, NULL, NULL);
597
598 free(cr, M_RPC);
599
600 return (stat);
601 }
602
603 static void
clnt_vc_geterr(CLIENT * cl,struct rpc_err * errp)604 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
605 {
606 struct ct_data *ct = (struct ct_data *) cl->cl_private;
607
608 *errp = ct->ct_error;
609 }
610
611 static bool_t
clnt_vc_freeres(CLIENT * cl,xdrproc_t xdr_res,void * res_ptr)612 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
613 {
614 XDR xdrs;
615 bool_t dummy;
616
617 xdrs.x_op = XDR_FREE;
618 dummy = (*xdr_res)(&xdrs, res_ptr);
619
620 return (dummy);
621 }
622
623 /*ARGSUSED*/
624 static void
clnt_vc_abort(CLIENT * cl)625 clnt_vc_abort(CLIENT *cl)
626 {
627 }
628
629 static bool_t
clnt_vc_control(CLIENT * cl,u_int request,void * info)630 clnt_vc_control(CLIENT *cl, u_int request, void *info)
631 {
632 struct ct_data *ct = (struct ct_data *)cl->cl_private;
633 void *infop = info;
634 SVCXPRT *xprt;
635 int error;
636 static u_int thrdnum = 0;
637
638 mtx_lock(&ct->ct_lock);
639
640 switch (request) {
641 case CLSET_FD_CLOSE:
642 ct->ct_closeit = TRUE;
643 mtx_unlock(&ct->ct_lock);
644 return (TRUE);
645 case CLSET_FD_NCLOSE:
646 ct->ct_closeit = FALSE;
647 mtx_unlock(&ct->ct_lock);
648 return (TRUE);
649 default:
650 break;
651 }
652
653 /* for other requests which use info */
654 if (info == NULL) {
655 mtx_unlock(&ct->ct_lock);
656 return (FALSE);
657 }
658 switch (request) {
659 case CLSET_TIMEOUT:
660 if (time_not_ok((struct timeval *)info)) {
661 mtx_unlock(&ct->ct_lock);
662 return (FALSE);
663 }
664 ct->ct_wait = *(struct timeval *)infop;
665 break;
666 case CLGET_TIMEOUT:
667 *(struct timeval *)infop = ct->ct_wait;
668 break;
669 case CLGET_SERVER_ADDR:
670 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
671 break;
672 case CLGET_SVC_ADDR:
673 /*
674 * Slightly different semantics to userland - we use
675 * sockaddr instead of netbuf.
676 */
677 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
678 break;
679 case CLSET_SVC_ADDR: /* set to new address */
680 mtx_unlock(&ct->ct_lock);
681 return (FALSE);
682 case CLGET_XID:
683 *(uint32_t *)info = ct->ct_xid;
684 break;
685 case CLSET_XID:
686 /* This will set the xid of the NEXT call */
687 /* decrement by 1 as clnt_vc_call() increments once */
688 ct->ct_xid = *(uint32_t *)info - 1;
689 break;
690 case CLGET_VERS:
691 /*
692 * This RELIES on the information that, in the call body,
693 * the version number field is the fifth field from the
694 * beginning of the RPC header. MUST be changed if the
695 * call_struct is changed
696 */
697 *(uint32_t *)info =
698 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
699 4 * BYTES_PER_XDR_UNIT));
700 break;
701
702 case CLSET_VERS:
703 *(uint32_t *)(void *)(ct->ct_mcallc +
704 4 * BYTES_PER_XDR_UNIT) =
705 htonl(*(uint32_t *)info);
706 break;
707
708 case CLGET_PROG:
709 /*
710 * This RELIES on the information that, in the call body,
711 * the program number field is the fourth field from the
712 * beginning of the RPC header. MUST be changed if the
713 * call_struct is changed
714 */
715 *(uint32_t *)info =
716 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
717 3 * BYTES_PER_XDR_UNIT));
718 break;
719
720 case CLSET_PROG:
721 *(uint32_t *)(void *)(ct->ct_mcallc +
722 3 * BYTES_PER_XDR_UNIT) =
723 htonl(*(uint32_t *)info);
724 break;
725
726 case CLSET_WAITCHAN:
727 ct->ct_waitchan = (const char *)info;
728 break;
729
730 case CLGET_WAITCHAN:
731 *(const char **) info = ct->ct_waitchan;
732 break;
733
734 case CLSET_INTERRUPTIBLE:
735 if (*(int *) info)
736 ct->ct_waitflag = PCATCH;
737 else
738 ct->ct_waitflag = 0;
739 break;
740
741 case CLGET_INTERRUPTIBLE:
742 if (ct->ct_waitflag)
743 *(int *) info = TRUE;
744 else
745 *(int *) info = FALSE;
746 break;
747
748 case CLSET_BACKCHANNEL:
749 xprt = (SVCXPRT *)info;
750 if (ct->ct_backchannelxprt == NULL) {
751 SVC_ACQUIRE(xprt);
752 xprt->xp_p2 = ct;
753 if (ct->ct_tlsstate > RPCTLS_NONE)
754 xprt->xp_tls = RPCTLS_FLAGS_HANDSHAKE;
755 ct->ct_backchannelxprt = xprt;
756 }
757 break;
758
759 case CLSET_TLS:
760 ct->ct_tlsstate = *(int *)info;
761 if (ct->ct_tlsstate == RPCTLS_COMPLETE) {
762 /* cl ref cnt is released by clnt_vc_dotlsupcall(). */
763 CLNT_ACQUIRE(cl);
764 mtx_unlock(&ct->ct_lock);
765 /* Start the kthread that handles upcalls. */
766 error = kthread_add(clnt_vc_dotlsupcall, cl,
767 NULL, NULL, 0, 0, "krpctls%u", thrdnum++);
768 if (error != 0)
769 panic("Can't add KRPC thread error %d", error);
770 } else
771 mtx_unlock(&ct->ct_lock);
772 return (TRUE);
773
774 case CLSET_BLOCKRCV:
775 if (*(int *) info) {
776 ct->ct_rcvstate &= ~RPCRCVSTATE_NORMAL;
777 ct->ct_rcvstate |= RPCRCVSTATE_TLSHANDSHAKE;
778 } else {
779 ct->ct_rcvstate &= ~RPCRCVSTATE_TLSHANDSHAKE;
780 ct->ct_rcvstate |= RPCRCVSTATE_NORMAL;
781 }
782 break;
783
784 default:
785 mtx_unlock(&ct->ct_lock);
786 return (FALSE);
787 }
788
789 mtx_unlock(&ct->ct_lock);
790 return (TRUE);
791 }
792
793 static void
clnt_vc_close(CLIENT * cl)794 clnt_vc_close(CLIENT *cl)
795 {
796 struct ct_data *ct = (struct ct_data *) cl->cl_private;
797 struct ct_request *cr;
798
799 mtx_lock(&ct->ct_lock);
800
801 if (ct->ct_closed) {
802 mtx_unlock(&ct->ct_lock);
803 return;
804 }
805
806 if (ct->ct_closing) {
807 while (ct->ct_closing)
808 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
809 KASSERT(ct->ct_closed, ("client should be closed"));
810 mtx_unlock(&ct->ct_lock);
811 return;
812 }
813
814 if (ct->ct_socket) {
815 ct->ct_closing = TRUE;
816 mtx_unlock(&ct->ct_lock);
817
818 SOCK_RECVBUF_LOCK(ct->ct_socket);
819 if (ct->ct_socket->so_rcv.sb_upcall != NULL) {
820 soupcall_clear(ct->ct_socket, SO_RCV);
821 clnt_vc_upcallsdone(ct);
822 }
823 SOCK_RECVBUF_UNLOCK(ct->ct_socket);
824
825 /*
826 * Abort any pending requests and wait until everyone
827 * has finished with clnt_vc_call.
828 */
829 mtx_lock(&ct->ct_lock);
830 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
831 cr->cr_xid = 0;
832 cr->cr_error = ESHUTDOWN;
833 wakeup(cr);
834 }
835
836 while (ct->ct_threads)
837 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
838 }
839
840 ct->ct_closing = FALSE;
841 ct->ct_closed = TRUE;
842 wakeup(&ct->ct_tlsstate);
843 mtx_unlock(&ct->ct_lock);
844 wakeup(ct);
845 }
846
847 static void
clnt_vc_destroy(CLIENT * cl)848 clnt_vc_destroy(CLIENT *cl)
849 {
850 struct ct_data *ct = (struct ct_data *) cl->cl_private;
851 struct socket *so;
852 SVCXPRT *xprt;
853 uint32_t reterr;
854
855 clnt_vc_close(cl);
856
857 mtx_lock(&ct->ct_lock);
858 xprt = ct->ct_backchannelxprt;
859 ct->ct_backchannelxprt = NULL;
860 if (xprt != NULL) {
861 mtx_unlock(&ct->ct_lock); /* To avoid a LOR. */
862 sx_xlock(&xprt->xp_lock);
863 mtx_lock(&ct->ct_lock);
864 xprt->xp_p2 = NULL;
865 sx_xunlock(&xprt->xp_lock);
866 SVC_RELEASE(xprt);
867 }
868
869 /* Wait for the upcall kthread to terminate. */
870 while ((ct->ct_rcvstate & RPCRCVSTATE_UPCALLTHREAD) != 0)
871 msleep(&ct->ct_tlsstate, &ct->ct_lock, 0,
872 "clntvccl", hz);
873 mtx_unlock(&ct->ct_lock);
874 mtx_destroy(&ct->ct_lock);
875
876 so = ct->ct_closeit ? ct->ct_socket : NULL;
877 if (so) {
878 /*
879 * If the TLS handshake is in progress, the upcall will fail,
880 * but the socket should be closed by the daemon, since the
881 * connect upcall has just failed. If the upcall fails, the
882 * socket has probably been closed via the rpctlscd daemon
883 * having crashed or been restarted, so ignore return stat.
884 */
885 CURVNET_SET(so->so_vnet);
886 switch (ct->ct_tlsstate) {
887 case RPCTLS_COMPLETE:
888 rpctls_cl_disconnect(so, &reterr);
889 /* FALLTHROUGH */
890 case RPCTLS_INHANDSHAKE:
891 /* Must sorele() to get rid of reference. */
892 sorele(so);
893 CURVNET_RESTORE();
894 break;
895 case RPCTLS_NONE:
896 CURVNET_RESTORE();
897 soshutdown(so, SHUT_WR);
898 soclose(so);
899 break;
900 }
901 }
902 m_freem(ct->ct_record);
903 m_freem(ct->ct_raw);
904 mem_free(ct, sizeof(struct ct_data));
905 if (cl->cl_netid && cl->cl_netid[0])
906 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
907 if (cl->cl_tp && cl->cl_tp[0])
908 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
909 mem_free(cl, sizeof(CLIENT));
910 }
911
912 /*
913 * Make sure that the time is not garbage. -1 value is disallowed.
914 * Note this is different from time_not_ok in clnt_dg.c
915 */
916 static bool_t
time_not_ok(struct timeval * t)917 time_not_ok(struct timeval *t)
918 {
919 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
920 t->tv_usec <= -1 || t->tv_usec > 1000000);
921 }
922
923 int
clnt_vc_soupcall(struct socket * so,void * arg,int waitflag)924 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
925 {
926 struct ct_data *ct = (struct ct_data *) arg;
927 struct uio uio;
928 struct mbuf *m, *m2;
929 struct ct_request *cr;
930 int error, rcvflag, foundreq;
931 uint32_t xid_plus_direction[2], header;
932 SVCXPRT *xprt;
933 struct cf_conn *cd;
934 u_int rawlen;
935 struct cmsghdr *cmsg;
936 struct tls_get_record tgr;
937
938 /*
939 * RPC-over-TLS needs to block reception during
940 * upcalls since the upcall will be doing I/O on
941 * the socket via openssl library calls.
942 */
943 mtx_lock(&ct->ct_lock);
944 if ((ct->ct_rcvstate & (RPCRCVSTATE_NORMAL |
945 RPCRCVSTATE_NONAPPDATA)) == 0) {
946 /* Mark that a socket upcall needs to be done. */
947 if ((ct->ct_rcvstate & (RPCRCVSTATE_UPCALLNEEDED |
948 RPCRCVSTATE_UPCALLINPROG)) != 0)
949 ct->ct_rcvstate |= RPCRCVSTATE_SOUPCALLNEEDED;
950 mtx_unlock(&ct->ct_lock);
951 return (SU_OK);
952 }
953 mtx_unlock(&ct->ct_lock);
954
955 /*
956 * If another thread is already here, it must be in
957 * soreceive(), so just return to avoid races with it.
958 * ct_upcallrefs is protected by the socket receive buffer lock
959 * which is held in this function, except when
960 * soreceive() is called.
961 */
962 if (ct->ct_upcallrefs > 0)
963 return (SU_OK);
964 ct->ct_upcallrefs++;
965
966 /*
967 * Read as much as possible off the socket and link it
968 * onto ct_raw.
969 */
970 for (;;) {
971 uio.uio_resid = 1000000000;
972 uio.uio_td = curthread;
973 m2 = m = NULL;
974 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
975 if (ct->ct_tlsstate > RPCTLS_NONE && (ct->ct_rcvstate &
976 RPCRCVSTATE_NORMAL) != 0)
977 rcvflag |= MSG_TLSAPPDATA;
978 SOCK_RECVBUF_UNLOCK(so);
979 error = soreceive(so, NULL, &uio, &m, &m2, &rcvflag);
980 SOCK_RECVBUF_LOCK(so);
981
982 if (error == EWOULDBLOCK) {
983 /*
984 * We must re-test for readability after
985 * taking the lock to protect us in the case
986 * where a new packet arrives on the socket
987 * after our call to soreceive fails with
988 * EWOULDBLOCK.
989 */
990 error = 0;
991 if (!soreadable(so))
992 break;
993 continue;
994 }
995 if (error == 0 && m == NULL) {
996 /*
997 * We must have got EOF trying
998 * to read from the stream.
999 */
1000 error = ECONNRESET;
1001 }
1002
1003 /*
1004 * A return of ENXIO indicates that there is an
1005 * alert record at the head of the
1006 * socket's receive queue, for TLS connections.
1007 * This record needs to be handled in userland
1008 * via an SSL_read() call, so do an upcall to the daemon.
1009 */
1010 if (ct->ct_tlsstate > RPCTLS_NONE && error == ENXIO) {
1011 /* Disable reception, marking an upcall needed. */
1012 mtx_lock(&ct->ct_lock);
1013 ct->ct_rcvstate |= RPCRCVSTATE_UPCALLNEEDED;
1014 /*
1015 * If an upcall in needed, wake up the kthread
1016 * that runs clnt_vc_dotlsupcall().
1017 */
1018 wakeup(&ct->ct_tlsstate);
1019 mtx_unlock(&ct->ct_lock);
1020 break;
1021 }
1022 if (error != 0)
1023 break;
1024
1025 /* Process any record header(s). */
1026 if (m2 != NULL) {
1027 cmsg = mtod(m2, struct cmsghdr *);
1028 if (cmsg->cmsg_type == TLS_GET_RECORD &&
1029 cmsg->cmsg_len == CMSG_LEN(sizeof(tgr))) {
1030 memcpy(&tgr, CMSG_DATA(cmsg), sizeof(tgr));
1031 /*
1032 * TLS_RLTYPE_ALERT records should be handled
1033 * since soreceive() would have returned
1034 * ENXIO. Just throw any other
1035 * non-TLS_RLTYPE_APP records away.
1036 */
1037 if (tgr.tls_type != TLS_RLTYPE_APP) {
1038 m_freem(m);
1039 m_free(m2);
1040 mtx_lock(&ct->ct_lock);
1041 ct->ct_rcvstate &=
1042 ~RPCRCVSTATE_NONAPPDATA;
1043 ct->ct_rcvstate |= RPCRCVSTATE_NORMAL;
1044 mtx_unlock(&ct->ct_lock);
1045 continue;
1046 }
1047 }
1048 m_free(m2);
1049 }
1050
1051 if (ct->ct_raw != NULL)
1052 m_last(ct->ct_raw)->m_next = m;
1053 else
1054 ct->ct_raw = m;
1055 }
1056 rawlen = m_length(ct->ct_raw, NULL);
1057
1058 /* Now, process as much of ct_raw as possible. */
1059 for (;;) {
1060 /*
1061 * If ct_record_resid is zero, we are waiting for a
1062 * record mark.
1063 */
1064 if (ct->ct_record_resid == 0) {
1065 if (rawlen < sizeof(uint32_t))
1066 break;
1067 m_copydata(ct->ct_raw, 0, sizeof(uint32_t),
1068 (char *)&header);
1069 header = ntohl(header);
1070 ct->ct_record_resid = header & 0x7fffffff;
1071 ct->ct_record_eor = ((header & 0x80000000) != 0);
1072 m_adj(ct->ct_raw, sizeof(uint32_t));
1073 rawlen -= sizeof(uint32_t);
1074 } else {
1075 /*
1076 * Move as much of the record as possible to
1077 * ct_record.
1078 */
1079 if (rawlen == 0)
1080 break;
1081 if (rawlen <= ct->ct_record_resid) {
1082 if (ct->ct_record != NULL)
1083 m_last(ct->ct_record)->m_next =
1084 ct->ct_raw;
1085 else
1086 ct->ct_record = ct->ct_raw;
1087 ct->ct_raw = NULL;
1088 ct->ct_record_resid -= rawlen;
1089 rawlen = 0;
1090 } else {
1091 m = m_split(ct->ct_raw, ct->ct_record_resid,
1092 M_NOWAIT);
1093 if (m == NULL)
1094 break;
1095 if (ct->ct_record != NULL)
1096 m_last(ct->ct_record)->m_next =
1097 ct->ct_raw;
1098 else
1099 ct->ct_record = ct->ct_raw;
1100 rawlen -= ct->ct_record_resid;
1101 ct->ct_record_resid = 0;
1102 ct->ct_raw = m;
1103 }
1104 if (ct->ct_record_resid > 0)
1105 break;
1106
1107 /*
1108 * If we have the entire record, see if we can
1109 * match it to a request.
1110 */
1111 if (ct->ct_record_eor) {
1112 /*
1113 * The XID is in the first uint32_t of
1114 * the reply and the message direction
1115 * is the second one.
1116 */
1117 if (ct->ct_record->m_len <
1118 sizeof(xid_plus_direction) &&
1119 m_length(ct->ct_record, NULL) <
1120 sizeof(xid_plus_direction)) {
1121 /*
1122 * What to do now?
1123 * The data in the TCP stream is
1124 * corrupted such that there is no
1125 * valid RPC message to parse.
1126 * I think it best to close this
1127 * connection and allow
1128 * clnt_reconnect_call() to try
1129 * and establish a new one.
1130 */
1131 printf("clnt_vc_soupcall: "
1132 "connection data corrupted\n");
1133 error = ECONNRESET;
1134 goto wakeup_all;
1135 }
1136 m_copydata(ct->ct_record, 0,
1137 sizeof(xid_plus_direction),
1138 (char *)xid_plus_direction);
1139 xid_plus_direction[0] =
1140 ntohl(xid_plus_direction[0]);
1141 xid_plus_direction[1] =
1142 ntohl(xid_plus_direction[1]);
1143 /* Check message direction. */
1144 if (xid_plus_direction[1] == CALL) {
1145 /* This is a backchannel request. */
1146 mtx_lock(&ct->ct_lock);
1147 xprt = ct->ct_backchannelxprt;
1148 if (xprt == NULL) {
1149 mtx_unlock(&ct->ct_lock);
1150 /* Just throw it away. */
1151 m_freem(ct->ct_record);
1152 ct->ct_record = NULL;
1153 } else {
1154 cd = (struct cf_conn *)
1155 xprt->xp_p1;
1156 m2 = cd->mreq;
1157 /*
1158 * The requests are chained
1159 * in the m_nextpkt list.
1160 */
1161 while (m2 != NULL &&
1162 m2->m_nextpkt != NULL)
1163 /* Find end of list. */
1164 m2 = m2->m_nextpkt;
1165 if (m2 != NULL)
1166 m2->m_nextpkt =
1167 ct->ct_record;
1168 else
1169 cd->mreq =
1170 ct->ct_record;
1171 ct->ct_record->m_nextpkt =
1172 NULL;
1173 ct->ct_record = NULL;
1174 xprt_active(xprt);
1175 mtx_unlock(&ct->ct_lock);
1176 }
1177 } else {
1178 mtx_lock(&ct->ct_lock);
1179 foundreq = 0;
1180 TAILQ_FOREACH(cr, &ct->ct_pending,
1181 cr_link) {
1182 if (cr->cr_xid ==
1183 xid_plus_direction[0]) {
1184 /*
1185 * This one
1186 * matches. We leave
1187 * the reply mbuf in
1188 * cr->cr_mrep. Set
1189 * the XID to zero so
1190 * that we will ignore
1191 * any duplicated
1192 * replies.
1193 */
1194 cr->cr_xid = 0;
1195 cr->cr_mrep =
1196 ct->ct_record;
1197 cr->cr_error = 0;
1198 foundreq = 1;
1199 wakeup(cr);
1200 break;
1201 }
1202 }
1203 mtx_unlock(&ct->ct_lock);
1204
1205 if (!foundreq)
1206 m_freem(ct->ct_record);
1207 ct->ct_record = NULL;
1208 }
1209 }
1210 }
1211 }
1212
1213 if (error != 0) {
1214 wakeup_all:
1215 /*
1216 * This socket is broken, so mark that it cannot
1217 * receive and fail all RPCs waiting for a reply
1218 * on it, so that they will be retried on a new
1219 * TCP connection created by clnt_reconnect_X().
1220 */
1221 mtx_lock(&ct->ct_lock);
1222 ct->ct_error.re_status = RPC_CANTRECV;
1223 ct->ct_error.re_errno = error;
1224 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
1225 cr->cr_error = error;
1226 wakeup(cr);
1227 }
1228 mtx_unlock(&ct->ct_lock);
1229 }
1230
1231 ct->ct_upcallrefs--;
1232 if (ct->ct_upcallrefs < 0)
1233 panic("rpcvc upcall refcnt");
1234 if (ct->ct_upcallrefs == 0)
1235 wakeup(&ct->ct_upcallrefs);
1236 return (SU_OK);
1237 }
1238
1239 /*
1240 * Wait for all upcalls in progress to complete.
1241 */
1242 static void
clnt_vc_upcallsdone(struct ct_data * ct)1243 clnt_vc_upcallsdone(struct ct_data *ct)
1244 {
1245
1246 SOCK_RECVBUF_LOCK_ASSERT(ct->ct_socket);
1247
1248 while (ct->ct_upcallrefs > 0)
1249 (void) msleep(&ct->ct_upcallrefs,
1250 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1251 }
1252
1253 /*
1254 * Do a TLS upcall to the rpctlscd daemon, as required.
1255 * This function runs as a kthread.
1256 */
1257 static void
clnt_vc_dotlsupcall(void * data)1258 clnt_vc_dotlsupcall(void *data)
1259 {
1260 CLIENT *cl = (CLIENT *)data;
1261 struct ct_data *ct = (struct ct_data *)cl->cl_private;
1262 enum clnt_stat ret;
1263 uint32_t reterr;
1264
1265 CURVNET_SET(ct->ct_socket->so_vnet);
1266 mtx_lock(&ct->ct_lock);
1267 ct->ct_rcvstate |= RPCRCVSTATE_UPCALLTHREAD;
1268 while (!ct->ct_closed) {
1269 if ((ct->ct_rcvstate & RPCRCVSTATE_UPCALLNEEDED) != 0) {
1270 ct->ct_rcvstate &= ~RPCRCVSTATE_UPCALLNEEDED;
1271 ct->ct_rcvstate |= RPCRCVSTATE_UPCALLINPROG;
1272 if (ct->ct_tlsstate == RPCTLS_COMPLETE) {
1273 mtx_unlock(&ct->ct_lock);
1274 ret = rpctls_cl_handlerecord(ct->ct_socket,
1275 &reterr);
1276 mtx_lock(&ct->ct_lock);
1277 }
1278 ct->ct_rcvstate &= ~RPCRCVSTATE_UPCALLINPROG;
1279 if (ret == RPC_SUCCESS && reterr == RPCTLSERR_OK)
1280 ct->ct_rcvstate |= RPCRCVSTATE_NORMAL;
1281 else
1282 ct->ct_rcvstate |= RPCRCVSTATE_NONAPPDATA;
1283 wakeup(&ct->ct_rcvstate);
1284 }
1285 if ((ct->ct_rcvstate & RPCRCVSTATE_SOUPCALLNEEDED) != 0) {
1286 ct->ct_rcvstate &= ~RPCRCVSTATE_SOUPCALLNEEDED;
1287 mtx_unlock(&ct->ct_lock);
1288 SOCK_RECVBUF_LOCK(ct->ct_socket);
1289 clnt_vc_soupcall(ct->ct_socket, ct, M_NOWAIT);
1290 SOCK_RECVBUF_UNLOCK(ct->ct_socket);
1291 mtx_lock(&ct->ct_lock);
1292 }
1293 msleep(&ct->ct_tlsstate, &ct->ct_lock, 0, "clntvcdu", hz);
1294 }
1295 ct->ct_rcvstate &= ~RPCRCVSTATE_UPCALLTHREAD;
1296 wakeup(&ct->ct_tlsstate);
1297 mtx_unlock(&ct->ct_lock);
1298 CLNT_RELEASE(cl);
1299 CURVNET_RESTORE();
1300 kthread_exit();
1301 }
1302