1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* Copyright (c) 1990 Mentat Inc. */ 26 27 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 28 /* All Rights Reserved */ 29 30 /* 31 * Kernel RPC filtering module 32 */ 33 34 #include <sys/param.h> 35 #include <sys/types.h> 36 #include <sys/stream.h> 37 #include <sys/stropts.h> 38 #include <sys/tihdr.h> 39 #include <sys/timod.h> 40 #include <sys/tiuser.h> 41 #include <sys/debug.h> 42 #include <sys/signal.h> 43 #include <sys/pcb.h> 44 #include <sys/user.h> 45 #include <sys/errno.h> 46 #include <sys/cred.h> 47 #include <sys/policy.h> 48 #include <sys/inline.h> 49 #include <sys/cmn_err.h> 50 #include <sys/kmem.h> 51 #include <sys/file.h> 52 #include <sys/sysmacros.h> 53 #include <sys/systm.h> 54 #include <sys/t_lock.h> 55 #include <sys/ddi.h> 56 #include <sys/vtrace.h> 57 #include <sys/callb.h> 58 #include <sys/strsun.h> 59 60 #include <sys/strlog.h> 61 #include <rpc/rpc_com.h> 62 #include <inet/common.h> 63 #include <rpc/types.h> 64 #include <sys/time.h> 65 #include <rpc/xdr.h> 66 #include <rpc/auth.h> 67 #include <rpc/clnt.h> 68 #include <rpc/rpc_msg.h> 69 #include <rpc/clnt.h> 70 #include <rpc/svc.h> 71 #include <rpc/rpcsys.h> 72 #include <rpc/rpc_rdma.h> 73 74 /* 75 * This is the loadable module wrapper. 76 */ 77 #include <sys/conf.h> 78 #include <sys/modctl.h> 79 #include <sys/syscall.h> 80 81 extern struct streamtab rpcinfo; 82 83 static struct fmodsw fsw = { 84 "rpcmod", 85 &rpcinfo, 86 D_NEW|D_MP, 87 }; 88 89 /* 90 * Module linkage information for the kernel. 91 */ 92 93 static struct modlstrmod modlstrmod = { 94 &mod_strmodops, "rpc interface str mod", &fsw 95 }; 96 97 /* 98 * For the RPC system call. 99 */ 100 static struct sysent rpcsysent = { 101 2, 102 SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD, 103 rpcsys 104 }; 105 106 static struct modlsys modlsys = { 107 &mod_syscallops, 108 "RPC syscall", 109 &rpcsysent 110 }; 111 112 #ifdef _SYSCALL32_IMPL 113 static struct modlsys modlsys32 = { 114 &mod_syscallops32, 115 "32-bit RPC syscall", 116 &rpcsysent 117 }; 118 #endif /* _SYSCALL32_IMPL */ 119 120 static struct modlinkage modlinkage = { 121 MODREV_1, 122 { 123 &modlsys, 124 #ifdef _SYSCALL32_IMPL 125 &modlsys32, 126 #endif 127 &modlstrmod, 128 NULL 129 } 130 }; 131 132 int 133 _init(void) 134 { 135 int error = 0; 136 callb_id_t cid; 137 int status; 138 139 svc_init(); 140 clnt_init(); 141 cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc"); 142 143 if (error = mod_install(&modlinkage)) { 144 /* 145 * Could not install module, cleanup previous 146 * initialization work. 147 */ 148 clnt_fini(); 149 if (cid != NULL) 150 (void) callb_delete(cid); 151 152 return (error); 153 } 154 155 /* 156 * Load up the RDMA plugins and initialize the stats. Even if the 157 * plugins loadup fails, but rpcmod was successfully installed the 158 * counters still get initialized. 159 */ 160 rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL); 161 mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL); 162 mt_kstat_init(); 163 164 /* 165 * Get our identification into ldi. This is used for loading 166 * other modules, e.g. rpcib. 167 */ 168 status = ldi_ident_from_mod(&modlinkage, &rpcmod_li); 169 if (status != 0) { 170 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status); 171 rpcmod_li = NULL; 172 } 173 174 return (error); 175 } 176 177 /* 178 * The unload entry point fails, because we advertise entry points into 179 * rpcmod from the rest of kRPC: rpcmod_release(). 180 */ 181 int 182 _fini(void) 183 { 184 return (EBUSY); 185 } 186 187 int 188 _info(struct modinfo *modinfop) 189 { 190 return (mod_info(&modlinkage, modinfop)); 191 } 192 193 extern int nulldev(); 194 195 #define RPCMOD_ID 2049 196 197 int rmm_open(), rmm_close(); 198 199 /* 200 * To save instructions, since STREAMS ignores the return value 201 * from these functions, they are defined as void here. Kind of icky, but... 202 */ 203 void rmm_rput(queue_t *, mblk_t *); 204 void rmm_wput(queue_t *, mblk_t *); 205 void rmm_rsrv(queue_t *); 206 void rmm_wsrv(queue_t *); 207 208 int rpcmodopen(), rpcmodclose(); 209 void rpcmodrput(), rpcmodwput(); 210 void rpcmodrsrv(), rpcmodwsrv(); 211 212 static void rpcmodwput_other(queue_t *, mblk_t *); 213 static int mir_close(queue_t *q); 214 static int mir_open(queue_t *q, dev_t *devp, int flag, int sflag, 215 cred_t *credp); 216 static void mir_rput(queue_t *q, mblk_t *mp); 217 static void mir_rsrv(queue_t *q); 218 static void mir_wput(queue_t *q, mblk_t *mp); 219 static void mir_wsrv(queue_t *q); 220 221 static struct module_info rpcmod_info = 222 {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024}; 223 224 /* 225 * Read side has no service procedure. 226 */ 227 static struct qinit rpcmodrinit = { 228 (int (*)())rmm_rput, 229 (int (*)())rmm_rsrv, 230 rmm_open, 231 rmm_close, 232 nulldev, 233 &rpcmod_info, 234 NULL 235 }; 236 237 /* 238 * The write put procedure is simply putnext to conserve stack space. 239 * The write service procedure is not used to queue data, but instead to 240 * synchronize with flow control. 241 */ 242 static struct qinit rpcmodwinit = { 243 (int (*)())rmm_wput, 244 (int (*)())rmm_wsrv, 245 rmm_open, 246 rmm_close, 247 nulldev, 248 &rpcmod_info, 249 NULL 250 }; 251 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL }; 252 253 struct xprt_style_ops { 254 int (*xo_open)(); 255 int (*xo_close)(); 256 void (*xo_wput)(); 257 void (*xo_wsrv)(); 258 void (*xo_rput)(); 259 void (*xo_rsrv)(); 260 }; 261 262 static struct xprt_style_ops xprt_clts_ops = { 263 rpcmodopen, 264 rpcmodclose, 265 rpcmodwput, 266 rpcmodwsrv, 267 rpcmodrput, 268 NULL 269 }; 270 271 static struct xprt_style_ops xprt_cots_ops = { 272 mir_open, 273 mir_close, 274 mir_wput, 275 mir_wsrv, 276 mir_rput, 277 mir_rsrv 278 }; 279 280 /* 281 * Per rpcmod "slot" data structure. q->q_ptr points to one of these. 282 */ 283 struct rpcm { 284 void *rm_krpc_cell; /* Reserved for use by KRPC */ 285 struct xprt_style_ops *rm_ops; 286 int rm_type; /* Client or server side stream */ 287 #define RM_CLOSING 0x1 /* somebody is trying to close slot */ 288 uint_t rm_state; /* state of the slot. see above */ 289 uint_t rm_ref; /* cnt of external references to slot */ 290 kmutex_t rm_lock; /* mutex protecting above fields */ 291 kcondvar_t rm_cwait; /* condition for closing */ 292 zoneid_t rm_zoneid; /* zone which pushed rpcmod */ 293 }; 294 295 struct temp_slot { 296 void *cell; 297 struct xprt_style_ops *ops; 298 int type; 299 mblk_t *info_ack; 300 kmutex_t lock; 301 kcondvar_t wait; 302 }; 303 304 typedef struct mir_s { 305 void *mir_krpc_cell; /* Reserved for KRPC use. This field */ 306 /* must be first in the structure. */ 307 struct xprt_style_ops *rm_ops; 308 int mir_type; /* Client or server side stream */ 309 310 mblk_t *mir_head_mp; /* RPC msg in progress */ 311 /* 312 * mir_head_mp points the first mblk being collected in 313 * the current RPC message. Record headers are removed 314 * before data is linked into mir_head_mp. 315 */ 316 mblk_t *mir_tail_mp; /* Last mblk in mir_head_mp */ 317 /* 318 * mir_tail_mp points to the last mblk in the message 319 * chain starting at mir_head_mp. It is only valid 320 * if mir_head_mp is non-NULL and is used to add new 321 * data blocks to the end of chain quickly. 322 */ 323 324 int32_t mir_frag_len; /* Bytes seen in the current frag */ 325 /* 326 * mir_frag_len starts at -4 for beginning of each fragment. 327 * When this length is negative, it indicates the number of 328 * bytes that rpcmod needs to complete the record marker 329 * header. When it is positive or zero, it holds the number 330 * of bytes that have arrived for the current fragment and 331 * are held in mir_header_mp. 332 */ 333 334 int32_t mir_frag_header; 335 /* 336 * Fragment header as collected for the current fragment. 337 * It holds the last-fragment indicator and the number 338 * of bytes in the fragment. 339 */ 340 341 unsigned int 342 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */ 343 mir_hold_inbound : 1, /* Hold inbound messages on server */ 344 /* side until outbound flow control */ 345 /* is relieved. */ 346 mir_closing : 1, /* The stream is being closed */ 347 mir_inrservice : 1, /* data queued or rd srv proc running */ 348 mir_inwservice : 1, /* data queued or wr srv proc running */ 349 mir_inwflushdata : 1, /* flush M_DATAs when srv runs */ 350 /* 351 * On client streams, mir_clntreq is 0 or 1; it is set 352 * to 1 whenever a new request is sent out (mir_wput) 353 * and cleared when the timer fires (mir_timer). If 354 * the timer fires with this value equal to 0, then the 355 * stream is considered idle and KRPC is notified. 356 */ 357 mir_clntreq : 1, 358 /* 359 * On server streams, stop accepting messages 360 */ 361 mir_svc_no_more_msgs : 1, 362 mir_listen_stream : 1, /* listen end point */ 363 mir_unused : 1, /* no longer used */ 364 mir_timer_call : 1, 365 mir_junk_fill_thru_bit_31 : 21; 366 367 int mir_setup_complete; /* server has initialized everything */ 368 timeout_id_t mir_timer_id; /* Timer for idle checks */ 369 clock_t mir_idle_timeout; /* Allowed idle time before shutdown */ 370 /* 371 * This value is copied from clnt_idle_timeout or 372 * svc_idle_timeout during the appropriate ioctl. 373 * Kept in milliseconds 374 */ 375 clock_t mir_use_timestamp; /* updated on client with each use */ 376 /* 377 * This value is set to lbolt 378 * every time a client stream sends or receives data. 379 * Even if the timer message arrives, we don't shutdown 380 * client unless: 381 * lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp. 382 * This value is kept in HZ. 383 */ 384 385 uint_t *mir_max_msg_sizep; /* Reference to sanity check size */ 386 /* 387 * This pointer is set to &clnt_max_msg_size or 388 * &svc_max_msg_size during the appropriate ioctl. 389 */ 390 zoneid_t mir_zoneid; /* zone which pushed rpcmod */ 391 /* Server-side fields. */ 392 int mir_ref_cnt; /* Reference count: server side only */ 393 /* counts the number of references */ 394 /* that a kernel RPC server thread */ 395 /* (see svc_run()) has on this rpcmod */ 396 /* slot. Effectively, it is the */ 397 /* number * of unprocessed messages */ 398 /* that have been passed up to the */ 399 /* KRPC layer */ 400 401 mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */ 402 /* T_DISCON_IND */ 403 404 /* 405 * these fields are for both client and server, but for debugging, 406 * it is easier to have these last in the structure. 407 */ 408 kmutex_t mir_mutex; /* Mutex and condvar for close */ 409 kcondvar_t mir_condvar; /* synchronization. */ 410 kcondvar_t mir_timer_cv; /* Timer routine sync. */ 411 } mir_t; 412 413 void tmp_rput(queue_t *q, mblk_t *mp); 414 415 struct xprt_style_ops tmpops = { 416 NULL, 417 NULL, 418 putnext, 419 NULL, 420 tmp_rput, 421 NULL 422 }; 423 424 void 425 tmp_rput(queue_t *q, mblk_t *mp) 426 { 427 struct temp_slot *t = (struct temp_slot *)(q->q_ptr); 428 struct T_info_ack *pptr; 429 430 switch (mp->b_datap->db_type) { 431 case M_PCPROTO: 432 pptr = (struct T_info_ack *)mp->b_rptr; 433 switch (pptr->PRIM_type) { 434 case T_INFO_ACK: 435 mutex_enter(&t->lock); 436 t->info_ack = mp; 437 cv_signal(&t->wait); 438 mutex_exit(&t->lock); 439 return; 440 default: 441 break; 442 } 443 default: 444 break; 445 } 446 447 /* 448 * Not an info-ack, so free it. This is ok because we should 449 * not be receiving data until the open finishes: rpcmod 450 * is pushed well before the end-point is bound to an address. 451 */ 452 freemsg(mp); 453 } 454 455 int 456 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) 457 { 458 mblk_t *bp; 459 struct temp_slot ts, *t; 460 struct T_info_ack *pptr; 461 int error = 0; 462 463 ASSERT(q != NULL); 464 /* 465 * Check for re-opens. 466 */ 467 if (q->q_ptr) { 468 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, 469 "rpcmodopen_end:(%s)", "q->qptr"); 470 return (0); 471 } 472 473 t = &ts; 474 bzero(t, sizeof (*t)); 475 q->q_ptr = (void *)t; 476 WR(q)->q_ptr = (void *)t; 477 478 /* 479 * Allocate the required messages upfront. 480 */ 481 if ((bp = allocb(sizeof (struct T_info_req) + 482 sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) { 483 return (ENOBUFS); 484 } 485 486 mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL); 487 cv_init(&t->wait, NULL, CV_DEFAULT, NULL); 488 489 t->ops = &tmpops; 490 491 qprocson(q); 492 bp->b_datap->db_type = M_PCPROTO; 493 *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ; 494 bp->b_wptr += sizeof (struct T_info_req); 495 putnext(WR(q), bp); 496 497 mutex_enter(&t->lock); 498 while (t->info_ack == NULL) { 499 if (cv_wait_sig(&t->wait, &t->lock) == 0) { 500 error = EINTR; 501 break; 502 } 503 } 504 mutex_exit(&t->lock); 505 506 if (error) 507 goto out; 508 509 pptr = (struct T_info_ack *)t->info_ack->b_rptr; 510 511 if (pptr->SERV_type == T_CLTS) { 512 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0) 513 ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops; 514 } else { 515 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0) 516 ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops; 517 } 518 519 out: 520 if (error) 521 qprocsoff(q); 522 523 freemsg(t->info_ack); 524 mutex_destroy(&t->lock); 525 cv_destroy(&t->wait); 526 527 return (error); 528 } 529 530 void 531 rmm_rput(queue_t *q, mblk_t *mp) 532 { 533 (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp); 534 } 535 536 void 537 rmm_rsrv(queue_t *q) 538 { 539 (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q); 540 } 541 542 void 543 rmm_wput(queue_t *q, mblk_t *mp) 544 { 545 (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp); 546 } 547 548 void 549 rmm_wsrv(queue_t *q) 550 { 551 (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q); 552 } 553 554 int 555 rmm_close(queue_t *q, int flag, cred_t *crp) 556 { 557 return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp)); 558 } 559 560 /* 561 * rpcmodopen - open routine gets called when the module gets pushed 562 * onto the stream. 563 */ 564 /*ARGSUSED*/ 565 int 566 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) 567 { 568 struct rpcm *rmp; 569 570 extern void (*rpc_rele)(queue_t *, mblk_t *); 571 static void rpcmod_release(queue_t *, mblk_t *); 572 573 TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:"); 574 575 /* 576 * Initialize entry points to release a rpcmod slot (and an input 577 * message if supplied) and to send an output message to the module 578 * below rpcmod. 579 */ 580 if (rpc_rele == NULL) 581 rpc_rele = rpcmod_release; 582 583 /* 584 * Only sufficiently privileged users can use this module, and it 585 * is assumed that they will use this module properly, and NOT send 586 * bulk data from downstream. 587 */ 588 if (secpolicy_rpcmod_open(crp) != 0) 589 return (EPERM); 590 591 /* 592 * Allocate slot data structure. 593 */ 594 rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP); 595 596 mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL); 597 cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL); 598 rmp->rm_zoneid = rpc_zoneid(); 599 /* 600 * slot type will be set by kRPC client and server ioctl's 601 */ 602 rmp->rm_type = 0; 603 604 q->q_ptr = (void *)rmp; 605 WR(q)->q_ptr = (void *)rmp; 606 607 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end"); 608 return (0); 609 } 610 611 /* 612 * rpcmodclose - This routine gets called when the module gets popped 613 * off of the stream. 614 */ 615 /*ARGSUSED*/ 616 int 617 rpcmodclose(queue_t *q, int flag, cred_t *crp) 618 { 619 struct rpcm *rmp; 620 621 ASSERT(q != NULL); 622 rmp = (struct rpcm *)q->q_ptr; 623 624 /* 625 * Mark our state as closing. 626 */ 627 mutex_enter(&rmp->rm_lock); 628 rmp->rm_state |= RM_CLOSING; 629 630 /* 631 * Check and see if there are any messages on the queue. If so, send 632 * the messages, regardless whether the downstream module is ready to 633 * accept data. 634 */ 635 if (rmp->rm_type == RPC_SERVER) { 636 flushq(q, FLUSHDATA); 637 638 qenable(WR(q)); 639 640 if (rmp->rm_ref) { 641 mutex_exit(&rmp->rm_lock); 642 /* 643 * call into SVC to clean the queue 644 */ 645 svc_queueclean(q); 646 mutex_enter(&rmp->rm_lock); 647 648 /* 649 * Block while there are kRPC threads with a reference 650 * to this message. 651 */ 652 while (rmp->rm_ref) 653 cv_wait(&rmp->rm_cwait, &rmp->rm_lock); 654 } 655 656 mutex_exit(&rmp->rm_lock); 657 658 /* 659 * It is now safe to remove this queue from the stream. No kRPC 660 * threads have a reference to the stream, and none ever will, 661 * because RM_CLOSING is set. 662 */ 663 qprocsoff(q); 664 665 /* Notify kRPC that this stream is going away. */ 666 svc_queueclose(q); 667 } else { 668 mutex_exit(&rmp->rm_lock); 669 qprocsoff(q); 670 } 671 672 q->q_ptr = NULL; 673 WR(q)->q_ptr = NULL; 674 mutex_destroy(&rmp->rm_lock); 675 cv_destroy(&rmp->rm_cwait); 676 kmem_free(rmp, sizeof (*rmp)); 677 return (0); 678 } 679 680 #ifdef DEBUG 681 int rpcmod_send_msg_up = 0; 682 int rpcmod_send_uderr = 0; 683 int rpcmod_send_dup = 0; 684 int rpcmod_send_dup_cnt = 0; 685 #endif 686 687 /* 688 * rpcmodrput - Module read put procedure. This is called from 689 * the module, driver, or stream head downstream. 690 */ 691 void 692 rpcmodrput(queue_t *q, mblk_t *mp) 693 { 694 struct rpcm *rmp; 695 union T_primitives *pptr; 696 int hdrsz; 697 698 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:"); 699 700 ASSERT(q != NULL); 701 rmp = (struct rpcm *)q->q_ptr; 702 703 if (rmp->rm_type == 0) { 704 freemsg(mp); 705 return; 706 } 707 708 #ifdef DEBUG 709 if (rpcmod_send_msg_up > 0) { 710 mblk_t *nmp = copymsg(mp); 711 if (nmp) { 712 putnext(q, nmp); 713 rpcmod_send_msg_up--; 714 } 715 } 716 if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) { 717 mblk_t *nmp; 718 struct T_unitdata_ind *data; 719 struct T_uderror_ind *ud; 720 int d; 721 data = (struct T_unitdata_ind *)mp->b_rptr; 722 if (data->PRIM_type == T_UNITDATA_IND) { 723 d = sizeof (*ud) - sizeof (*data); 724 nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI); 725 if (nmp) { 726 ud = (struct T_uderror_ind *)nmp->b_rptr; 727 ud->PRIM_type = T_UDERROR_IND; 728 ud->DEST_length = data->SRC_length; 729 ud->DEST_offset = data->SRC_offset + d; 730 ud->OPT_length = data->OPT_length; 731 ud->OPT_offset = data->OPT_offset + d; 732 ud->ERROR_type = ENETDOWN; 733 if (data->SRC_length) { 734 bcopy(mp->b_rptr + 735 data->SRC_offset, 736 nmp->b_rptr + 737 ud->DEST_offset, 738 data->SRC_length); 739 } 740 if (data->OPT_length) { 741 bcopy(mp->b_rptr + 742 data->OPT_offset, 743 nmp->b_rptr + 744 ud->OPT_offset, 745 data->OPT_length); 746 } 747 nmp->b_wptr += d; 748 nmp->b_wptr += (mp->b_wptr - mp->b_rptr); 749 nmp->b_datap->db_type = M_PROTO; 750 putnext(q, nmp); 751 rpcmod_send_uderr--; 752 } 753 } 754 } 755 #endif 756 switch (mp->b_datap->db_type) { 757 default: 758 putnext(q, mp); 759 break; 760 761 case M_PROTO: 762 case M_PCPROTO: 763 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t)); 764 pptr = (union T_primitives *)mp->b_rptr; 765 766 /* 767 * Forward this message to krpc if it is data. 768 */ 769 if (pptr->type == T_UNITDATA_IND) { 770 mblk_t *nmp; 771 772 /* 773 * Check if the module is being popped. 774 */ 775 mutex_enter(&rmp->rm_lock); 776 if (rmp->rm_state & RM_CLOSING) { 777 mutex_exit(&rmp->rm_lock); 778 putnext(q, mp); 779 break; 780 } 781 782 switch (rmp->rm_type) { 783 case RPC_CLIENT: 784 mutex_exit(&rmp->rm_lock); 785 hdrsz = mp->b_wptr - mp->b_rptr; 786 787 /* 788 * Make sure the header is sane. 789 */ 790 if (hdrsz < TUNITDATAINDSZ || 791 hdrsz < (pptr->unitdata_ind.OPT_length + 792 pptr->unitdata_ind.OPT_offset) || 793 hdrsz < (pptr->unitdata_ind.SRC_length + 794 pptr->unitdata_ind.SRC_offset)) { 795 freemsg(mp); 796 return; 797 } 798 799 /* 800 * Call clnt_clts_dispatch_notify, so that it 801 * can pass the message to the proper caller. 802 * Don't discard the header just yet since the 803 * client may need the sender's address. 804 */ 805 clnt_clts_dispatch_notify(mp, hdrsz, 806 rmp->rm_zoneid); 807 return; 808 case RPC_SERVER: 809 /* 810 * rm_krpc_cell is exclusively used by the kRPC 811 * CLTS server 812 */ 813 if (rmp->rm_krpc_cell) { 814 #ifdef DEBUG 815 /* 816 * Test duplicate request cache and 817 * rm_ref count handling by sending a 818 * duplicate every so often, if 819 * desired. 820 */ 821 if (rpcmod_send_dup && 822 rpcmod_send_dup_cnt++ % 823 rpcmod_send_dup) 824 nmp = copymsg(mp); 825 else 826 nmp = NULL; 827 #endif 828 /* 829 * Raise the reference count on this 830 * module to prevent it from being 831 * popped before krpc generates the 832 * reply. 833 */ 834 rmp->rm_ref++; 835 mutex_exit(&rmp->rm_lock); 836 837 /* 838 * Submit the message to krpc. 839 */ 840 svc_queuereq(q, mp); 841 #ifdef DEBUG 842 /* 843 * Send duplicate if we created one. 844 */ 845 if (nmp) { 846 mutex_enter(&rmp->rm_lock); 847 rmp->rm_ref++; 848 mutex_exit(&rmp->rm_lock); 849 svc_queuereq(q, nmp); 850 } 851 #endif 852 } else { 853 mutex_exit(&rmp->rm_lock); 854 freemsg(mp); 855 } 856 return; 857 default: 858 mutex_exit(&rmp->rm_lock); 859 freemsg(mp); 860 return; 861 } /* end switch(rmp->rm_type) */ 862 } else if (pptr->type == T_UDERROR_IND) { 863 mutex_enter(&rmp->rm_lock); 864 hdrsz = mp->b_wptr - mp->b_rptr; 865 866 /* 867 * Make sure the header is sane 868 */ 869 if (hdrsz < TUDERRORINDSZ || 870 hdrsz < (pptr->uderror_ind.OPT_length + 871 pptr->uderror_ind.OPT_offset) || 872 hdrsz < (pptr->uderror_ind.DEST_length + 873 pptr->uderror_ind.DEST_offset)) { 874 mutex_exit(&rmp->rm_lock); 875 freemsg(mp); 876 return; 877 } 878 879 /* 880 * In the case where a unit data error has been 881 * received, all we need to do is clear the message from 882 * the queue. 883 */ 884 mutex_exit(&rmp->rm_lock); 885 freemsg(mp); 886 RPCLOG(32, "rpcmodrput: unitdata error received at " 887 "%ld\n", gethrestime_sec()); 888 return; 889 } /* end else if (pptr->type == T_UDERROR_IND) */ 890 891 putnext(q, mp); 892 break; 893 } /* end switch (mp->b_datap->db_type) */ 894 895 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END, 896 "rpcmodrput_end:"); 897 /* 898 * Return codes are not looked at by the STREAMS framework. 899 */ 900 } 901 902 /* 903 * write put procedure 904 */ 905 void 906 rpcmodwput(queue_t *q, mblk_t *mp) 907 { 908 struct rpcm *rmp; 909 910 ASSERT(q != NULL); 911 912 switch (mp->b_datap->db_type) { 913 case M_PROTO: 914 case M_PCPROTO: 915 break; 916 default: 917 rpcmodwput_other(q, mp); 918 return; 919 } 920 921 /* 922 * Check to see if we can send the message downstream. 923 */ 924 if (canputnext(q)) { 925 putnext(q, mp); 926 return; 927 } 928 929 rmp = (struct rpcm *)q->q_ptr; 930 ASSERT(rmp != NULL); 931 932 /* 933 * The first canputnext failed. Try again except this time with the 934 * lock held, so that we can check the state of the stream to see if 935 * it is closing. If either of these conditions evaluate to true 936 * then send the meesage. 937 */ 938 mutex_enter(&rmp->rm_lock); 939 if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) { 940 mutex_exit(&rmp->rm_lock); 941 putnext(q, mp); 942 } else { 943 /* 944 * canputnext failed again and the stream is not closing. 945 * Place the message on the queue and let the service 946 * procedure handle the message. 947 */ 948 mutex_exit(&rmp->rm_lock); 949 (void) putq(q, mp); 950 } 951 } 952 953 static void 954 rpcmodwput_other(queue_t *q, mblk_t *mp) 955 { 956 struct rpcm *rmp; 957 struct iocblk *iocp; 958 959 rmp = (struct rpcm *)q->q_ptr; 960 ASSERT(rmp != NULL); 961 962 switch (mp->b_datap->db_type) { 963 case M_IOCTL: 964 iocp = (struct iocblk *)mp->b_rptr; 965 ASSERT(iocp != NULL); 966 switch (iocp->ioc_cmd) { 967 case RPC_CLIENT: 968 case RPC_SERVER: 969 mutex_enter(&rmp->rm_lock); 970 rmp->rm_type = iocp->ioc_cmd; 971 mutex_exit(&rmp->rm_lock); 972 mp->b_datap->db_type = M_IOCACK; 973 qreply(q, mp); 974 return; 975 default: 976 /* 977 * pass the ioctl downstream and hope someone 978 * down there knows how to handle it. 979 */ 980 putnext(q, mp); 981 return; 982 } 983 default: 984 break; 985 } 986 /* 987 * This is something we definitely do not know how to handle, just 988 * pass the message downstream 989 */ 990 putnext(q, mp); 991 } 992 993 /* 994 * Module write service procedure. This is called by downstream modules 995 * for back enabling during flow control. 996 */ 997 void 998 rpcmodwsrv(queue_t *q) 999 { 1000 struct rpcm *rmp; 1001 mblk_t *mp = NULL; 1002 1003 rmp = (struct rpcm *)q->q_ptr; 1004 ASSERT(rmp != NULL); 1005 1006 /* 1007 * Get messages that may be queued and send them down stream 1008 */ 1009 while ((mp = getq(q)) != NULL) { 1010 /* 1011 * Optimize the service procedure for the server-side, by 1012 * avoiding a call to canputnext(). 1013 */ 1014 if (rmp->rm_type == RPC_SERVER || canputnext(q)) { 1015 putnext(q, mp); 1016 continue; 1017 } 1018 (void) putbq(q, mp); 1019 return; 1020 } 1021 } 1022 1023 static void 1024 rpcmod_release(queue_t *q, mblk_t *bp) 1025 { 1026 struct rpcm *rmp; 1027 1028 /* 1029 * For now, just free the message. 1030 */ 1031 if (bp) 1032 freemsg(bp); 1033 rmp = (struct rpcm *)q->q_ptr; 1034 1035 mutex_enter(&rmp->rm_lock); 1036 rmp->rm_ref--; 1037 1038 if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) { 1039 cv_broadcast(&rmp->rm_cwait); 1040 } 1041 1042 mutex_exit(&rmp->rm_lock); 1043 } 1044 1045 /* 1046 * This part of rpcmod is pushed on a connection-oriented transport for use 1047 * by RPC. It serves to bypass the Stream head, implements 1048 * the record marking protocol, and dispatches incoming RPC messages. 1049 */ 1050 1051 /* Default idle timer values */ 1052 #define MIR_CLNT_IDLE_TIMEOUT (5 * (60 * 1000L)) /* 5 minutes */ 1053 #define MIR_SVC_IDLE_TIMEOUT (6 * (60 * 1000L)) /* 6 minutes */ 1054 #define MIR_SVC_ORDREL_TIMEOUT (10 * (60 * 1000L)) /* 10 minutes */ 1055 #define MIR_LASTFRAG 0x80000000 /* Record marker */ 1056 1057 #define DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr)) 1058 1059 #define MIR_SVC_QUIESCED(mir) \ 1060 (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0) 1061 1062 #define MIR_CLEAR_INRSRV(mir_ptr) { \ 1063 (mir_ptr)->mir_inrservice = 0; \ 1064 if ((mir_ptr)->mir_type == RPC_SERVER && \ 1065 (mir_ptr)->mir_closing) \ 1066 cv_signal(&(mir_ptr)->mir_condvar); \ 1067 } 1068 1069 /* 1070 * Don't block service procedure (and mir_close) if 1071 * we are in the process of closing. 1072 */ 1073 #define MIR_WCANPUTNEXT(mir_ptr, write_q) \ 1074 (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1)) 1075 1076 static int mir_clnt_dup_request(queue_t *q, mblk_t *mp); 1077 static void mir_rput_proto(queue_t *q, mblk_t *mp); 1078 static int mir_svc_policy_notify(queue_t *q, int event); 1079 static void mir_svc_release(queue_t *wq, mblk_t *mp); 1080 static void mir_svc_start(queue_t *wq); 1081 static void mir_svc_idle_start(queue_t *, mir_t *); 1082 static void mir_svc_idle_stop(queue_t *, mir_t *); 1083 static void mir_svc_start_close(queue_t *, mir_t *); 1084 static void mir_clnt_idle_do_stop(queue_t *); 1085 static void mir_clnt_idle_stop(queue_t *, mir_t *); 1086 static void mir_clnt_idle_start(queue_t *, mir_t *); 1087 static void mir_wput(queue_t *q, mblk_t *mp); 1088 static void mir_wput_other(queue_t *q, mblk_t *mp); 1089 static void mir_wsrv(queue_t *q); 1090 static void mir_disconnect(queue_t *, mir_t *ir); 1091 static int mir_check_len(queue_t *, int32_t, mblk_t *); 1092 static void mir_timer(void *); 1093 1094 extern void (*mir_rele)(queue_t *, mblk_t *); 1095 extern void (*mir_start)(queue_t *); 1096 extern void (*clnt_stop_idle)(queue_t *); 1097 1098 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT; 1099 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT; 1100 1101 /* 1102 * Timeout for subsequent notifications of idle connection. This is 1103 * typically used to clean up after a wedged orderly release. 1104 */ 1105 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */ 1106 1107 extern uint_t *clnt_max_msg_sizep; 1108 extern uint_t *svc_max_msg_sizep; 1109 uint_t clnt_max_msg_size = RPC_MAXDATASIZE; 1110 uint_t svc_max_msg_size = RPC_MAXDATASIZE; 1111 uint_t mir_krpc_cell_null; 1112 1113 static void 1114 mir_timer_stop(mir_t *mir) 1115 { 1116 timeout_id_t tid; 1117 1118 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1119 1120 /* 1121 * Since the mir_mutex lock needs to be released to call 1122 * untimeout(), we need to make sure that no other thread 1123 * can start/stop the timer (changing mir_timer_id) during 1124 * that time. The mir_timer_call bit and the mir_timer_cv 1125 * condition variable are used to synchronize this. Setting 1126 * mir_timer_call also tells mir_timer() (refer to the comments 1127 * in mir_timer()) that it does not need to do anything. 1128 */ 1129 while (mir->mir_timer_call) 1130 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); 1131 mir->mir_timer_call = B_TRUE; 1132 1133 if ((tid = mir->mir_timer_id) != 0) { 1134 mir->mir_timer_id = 0; 1135 mutex_exit(&mir->mir_mutex); 1136 (void) untimeout(tid); 1137 mutex_enter(&mir->mir_mutex); 1138 } 1139 mir->mir_timer_call = B_FALSE; 1140 cv_broadcast(&mir->mir_timer_cv); 1141 } 1142 1143 static void 1144 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl) 1145 { 1146 timeout_id_t tid; 1147 1148 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1149 1150 while (mir->mir_timer_call) 1151 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); 1152 mir->mir_timer_call = B_TRUE; 1153 1154 if ((tid = mir->mir_timer_id) != 0) { 1155 mutex_exit(&mir->mir_mutex); 1156 (void) untimeout(tid); 1157 mutex_enter(&mir->mir_mutex); 1158 } 1159 /* Only start the timer when it is not closing. */ 1160 if (!mir->mir_closing) { 1161 mir->mir_timer_id = timeout(mir_timer, q, 1162 MSEC_TO_TICK(intrvl)); 1163 } 1164 mir->mir_timer_call = B_FALSE; 1165 cv_broadcast(&mir->mir_timer_cv); 1166 } 1167 1168 static int 1169 mir_clnt_dup_request(queue_t *q, mblk_t *mp) 1170 { 1171 mblk_t *mp1; 1172 uint32_t new_xid; 1173 uint32_t old_xid; 1174 1175 ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex)); 1176 new_xid = BE32_TO_U32(&mp->b_rptr[4]); 1177 /* 1178 * This loop is a bit tacky -- it walks the STREAMS list of 1179 * flow-controlled messages. 1180 */ 1181 if ((mp1 = q->q_first) != NULL) { 1182 do { 1183 old_xid = BE32_TO_U32(&mp1->b_rptr[4]); 1184 if (new_xid == old_xid) 1185 return (1); 1186 } while ((mp1 = mp1->b_next) != NULL); 1187 } 1188 return (0); 1189 } 1190 1191 static int 1192 mir_close(queue_t *q) 1193 { 1194 mir_t *mir = q->q_ptr; 1195 mblk_t *mp; 1196 bool_t queue_cleaned = FALSE; 1197 1198 RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q); 1199 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1200 mutex_enter(&mir->mir_mutex); 1201 if ((mp = mir->mir_head_mp) != NULL) { 1202 mir->mir_head_mp = NULL; 1203 mir->mir_tail_mp = NULL; 1204 freemsg(mp); 1205 } 1206 /* 1207 * Set mir_closing so we get notified when MIR_SVC_QUIESCED() 1208 * is TRUE. And mir_timer_start() won't start the timer again. 1209 */ 1210 mir->mir_closing = B_TRUE; 1211 mir_timer_stop(mir); 1212 1213 if (mir->mir_type == RPC_SERVER) { 1214 flushq(q, FLUSHDATA); /* Ditch anything waiting on read q */ 1215 1216 /* 1217 * This will prevent more requests from arriving and 1218 * will force rpcmod to ignore flow control. 1219 */ 1220 mir_svc_start_close(WR(q), mir); 1221 1222 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) { 1223 1224 if (mir->mir_ref_cnt && !mir->mir_inrservice && 1225 (queue_cleaned == FALSE)) { 1226 /* 1227 * call into SVC to clean the queue 1228 */ 1229 mutex_exit(&mir->mir_mutex); 1230 svc_queueclean(q); 1231 queue_cleaned = TRUE; 1232 mutex_enter(&mir->mir_mutex); 1233 continue; 1234 } 1235 1236 /* 1237 * Bugid 1253810 - Force the write service 1238 * procedure to send its messages, regardless 1239 * whether the downstream module is ready 1240 * to accept data. 1241 */ 1242 if (mir->mir_inwservice == 1) 1243 qenable(WR(q)); 1244 1245 cv_wait(&mir->mir_condvar, &mir->mir_mutex); 1246 } 1247 1248 mutex_exit(&mir->mir_mutex); 1249 qprocsoff(q); 1250 1251 /* Notify KRPC that this stream is going away. */ 1252 svc_queueclose(q); 1253 } else { 1254 mutex_exit(&mir->mir_mutex); 1255 qprocsoff(q); 1256 } 1257 1258 mutex_destroy(&mir->mir_mutex); 1259 cv_destroy(&mir->mir_condvar); 1260 cv_destroy(&mir->mir_timer_cv); 1261 kmem_free(mir, sizeof (mir_t)); 1262 return (0); 1263 } 1264 1265 /* 1266 * This is server side only (RPC_SERVER). 1267 * 1268 * Exit idle mode. 1269 */ 1270 static void 1271 mir_svc_idle_stop(queue_t *q, mir_t *mir) 1272 { 1273 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1274 ASSERT((q->q_flag & QREADR) == 0); 1275 ASSERT(mir->mir_type == RPC_SERVER); 1276 RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q); 1277 1278 mir_timer_stop(mir); 1279 } 1280 1281 /* 1282 * This is server side only (RPC_SERVER). 1283 * 1284 * Start idle processing, which will include setting idle timer if the 1285 * stream is not being closed. 1286 */ 1287 static void 1288 mir_svc_idle_start(queue_t *q, mir_t *mir) 1289 { 1290 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1291 ASSERT((q->q_flag & QREADR) == 0); 1292 ASSERT(mir->mir_type == RPC_SERVER); 1293 RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q); 1294 1295 /* 1296 * Don't re-start idle timer if we are closing queues. 1297 */ 1298 if (mir->mir_closing) { 1299 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n", 1300 (void *)q); 1301 1302 /* 1303 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED() 1304 * is true. When it is true, and we are in the process of 1305 * closing the stream, signal any thread waiting in 1306 * mir_close(). 1307 */ 1308 if (mir->mir_inwservice == 0) 1309 cv_signal(&mir->mir_condvar); 1310 1311 } else { 1312 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n", 1313 mir->mir_ordrel_pending ? "ordrel" : "normal"); 1314 /* 1315 * Normal condition, start the idle timer. If an orderly 1316 * release has been sent, set the timeout to wait for the 1317 * client to close its side of the connection. Otherwise, 1318 * use the normal idle timeout. 1319 */ 1320 mir_timer_start(q, mir, mir->mir_ordrel_pending ? 1321 svc_ordrel_timeout : mir->mir_idle_timeout); 1322 } 1323 } 1324 1325 /* ARGSUSED */ 1326 static int 1327 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) 1328 { 1329 mir_t *mir; 1330 1331 RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q); 1332 /* Set variables used directly by KRPC. */ 1333 if (!mir_rele) 1334 mir_rele = mir_svc_release; 1335 if (!mir_start) 1336 mir_start = mir_svc_start; 1337 if (!clnt_stop_idle) 1338 clnt_stop_idle = mir_clnt_idle_do_stop; 1339 if (!clnt_max_msg_sizep) 1340 clnt_max_msg_sizep = &clnt_max_msg_size; 1341 if (!svc_max_msg_sizep) 1342 svc_max_msg_sizep = &svc_max_msg_size; 1343 1344 /* Allocate a zero'ed out mir structure for this stream. */ 1345 mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP); 1346 1347 /* 1348 * We set hold inbound here so that incoming messages will 1349 * be held on the read-side queue until the stream is completely 1350 * initialized with a RPC_CLIENT or RPC_SERVER ioctl. During 1351 * the ioctl processing, the flag is cleared and any messages that 1352 * arrived between the open and the ioctl are delivered to KRPC. 1353 * 1354 * Early data should never arrive on a client stream since 1355 * servers only respond to our requests and we do not send any. 1356 * until after the stream is initialized. Early data is 1357 * very common on a server stream where the client will start 1358 * sending data as soon as the connection is made (and this 1359 * is especially true with TCP where the protocol accepts the 1360 * connection before nfsd or KRPC is notified about it). 1361 */ 1362 1363 mir->mir_hold_inbound = 1; 1364 1365 /* 1366 * Start the record marker looking for a 4-byte header. When 1367 * this length is negative, it indicates that rpcmod is looking 1368 * for bytes to consume for the record marker header. When it 1369 * is positive, it holds the number of bytes that have arrived 1370 * for the current fragment and are being held in mir_header_mp. 1371 */ 1372 1373 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 1374 1375 mir->mir_zoneid = rpc_zoneid(); 1376 mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL); 1377 cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL); 1378 cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL); 1379 1380 q->q_ptr = (char *)mir; 1381 WR(q)->q_ptr = (char *)mir; 1382 1383 /* 1384 * We noenable the read-side queue because we don't want it 1385 * automatically enabled by putq. We enable it explicitly 1386 * in mir_wsrv when appropriate. (See additional comments on 1387 * flow control at the beginning of mir_rsrv.) 1388 */ 1389 noenable(q); 1390 1391 qprocson(q); 1392 return (0); 1393 } 1394 1395 /* 1396 * Read-side put routine for both the client and server side. Does the 1397 * record marking for incoming RPC messages, and when complete, dispatches 1398 * the message to either the client or server. 1399 */ 1400 static void 1401 mir_rput(queue_t *q, mblk_t *mp) 1402 { 1403 int excess; 1404 int32_t frag_len, frag_header; 1405 mblk_t *cont_mp, *head_mp, *tail_mp, *mp1; 1406 mir_t *mir = q->q_ptr; 1407 boolean_t stop_timer = B_FALSE; 1408 1409 ASSERT(mir != NULL); 1410 1411 /* 1412 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER 1413 * with the corresponding ioctl, then don't accept 1414 * any inbound data. This should never happen for streams 1415 * created by nfsd or client-side KRPC because they are careful 1416 * to set the mode of the stream before doing anything else. 1417 */ 1418 if (mir->mir_type == 0) { 1419 freemsg(mp); 1420 return; 1421 } 1422 1423 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1424 1425 switch (mp->b_datap->db_type) { 1426 case M_DATA: 1427 break; 1428 case M_PROTO: 1429 case M_PCPROTO: 1430 if (MBLKL(mp) < sizeof (t_scalar_t)) { 1431 RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n", 1432 (int)MBLKL(mp)); 1433 freemsg(mp); 1434 return; 1435 } 1436 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) { 1437 mir_rput_proto(q, mp); 1438 return; 1439 } 1440 1441 /* Throw away the T_DATA_IND block and continue with data. */ 1442 mp1 = mp; 1443 mp = mp->b_cont; 1444 freeb(mp1); 1445 break; 1446 case M_SETOPTS: 1447 /* 1448 * If a module on the stream is trying set the Stream head's 1449 * high water mark, then set our hiwater to the requested 1450 * value. We are the "stream head" for all inbound 1451 * data messages since messages are passed directly to KRPC. 1452 */ 1453 if (MBLKL(mp) >= sizeof (struct stroptions)) { 1454 struct stroptions *stropts; 1455 1456 stropts = (struct stroptions *)mp->b_rptr; 1457 if ((stropts->so_flags & SO_HIWAT) && 1458 !(stropts->so_flags & SO_BAND)) { 1459 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat); 1460 } 1461 } 1462 putnext(q, mp); 1463 return; 1464 case M_FLUSH: 1465 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr); 1466 RPCLOG(32, "on q 0x%p\n", (void *)q); 1467 putnext(q, mp); 1468 return; 1469 default: 1470 putnext(q, mp); 1471 return; 1472 } 1473 1474 mutex_enter(&mir->mir_mutex); 1475 1476 /* 1477 * If this connection is closing, don't accept any new messages. 1478 */ 1479 if (mir->mir_svc_no_more_msgs) { 1480 ASSERT(mir->mir_type == RPC_SERVER); 1481 mutex_exit(&mir->mir_mutex); 1482 freemsg(mp); 1483 return; 1484 } 1485 1486 /* Get local copies for quicker access. */ 1487 frag_len = mir->mir_frag_len; 1488 frag_header = mir->mir_frag_header; 1489 head_mp = mir->mir_head_mp; 1490 tail_mp = mir->mir_tail_mp; 1491 1492 /* Loop, processing each message block in the mp chain separately. */ 1493 do { 1494 cont_mp = mp->b_cont; 1495 mp->b_cont = NULL; 1496 1497 /* 1498 * Drop zero-length mblks to prevent unbounded kernel memory 1499 * consumption. 1500 */ 1501 if (MBLKL(mp) == 0) { 1502 freeb(mp); 1503 continue; 1504 } 1505 1506 /* 1507 * If frag_len is negative, we're still in the process of 1508 * building frag_header -- try to complete it with this mblk. 1509 */ 1510 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) { 1511 frag_len++; 1512 frag_header <<= 8; 1513 frag_header += *mp->b_rptr++; 1514 } 1515 1516 if (MBLKL(mp) == 0 && frag_len < 0) { 1517 /* 1518 * We consumed this mblk while trying to complete the 1519 * fragment header. Free it and move on. 1520 */ 1521 freeb(mp); 1522 continue; 1523 } 1524 1525 ASSERT(frag_len >= 0); 1526 1527 /* 1528 * Now frag_header has the number of bytes in this fragment 1529 * and we're just waiting to collect them all. Chain our 1530 * latest mblk onto the list and see if we now have enough 1531 * bytes to complete the fragment. 1532 */ 1533 if (head_mp == NULL) { 1534 ASSERT(tail_mp == NULL); 1535 head_mp = tail_mp = mp; 1536 } else { 1537 tail_mp->b_cont = mp; 1538 tail_mp = mp; 1539 } 1540 1541 frag_len += MBLKL(mp); 1542 excess = frag_len - (frag_header & ~MIR_LASTFRAG); 1543 if (excess < 0) { 1544 /* 1545 * We still haven't received enough data to complete 1546 * the fragment, so continue on to the next mblk. 1547 */ 1548 continue; 1549 } 1550 1551 /* 1552 * We've got a complete fragment. If there are excess bytes, 1553 * then they're part of the next fragment's header (of either 1554 * this RPC message or the next RPC message). Split that part 1555 * into its own mblk so that we can safely freeb() it when 1556 * building frag_header above. 1557 */ 1558 if (excess > 0) { 1559 if ((mp1 = dupb(mp)) == NULL && 1560 (mp1 = copyb(mp)) == NULL) { 1561 freemsg(head_mp); 1562 freemsg(cont_mp); 1563 RPCLOG0(1, "mir_rput: dupb/copyb failed\n"); 1564 mir->mir_frag_header = 0; 1565 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 1566 mir->mir_head_mp = NULL; 1567 mir->mir_tail_mp = NULL; 1568 mir_disconnect(q, mir); /* drops mir_mutex */ 1569 return; 1570 } 1571 1572 /* 1573 * Relink the message chain so that the next mblk is 1574 * the next fragment header, followed by the rest of 1575 * the message chain. 1576 */ 1577 mp1->b_cont = cont_mp; 1578 cont_mp = mp1; 1579 1580 /* 1581 * Data in the new mblk begins at the next fragment, 1582 * and data in the old mblk ends at the next fragment. 1583 */ 1584 mp1->b_rptr = mp1->b_wptr - excess; 1585 mp->b_wptr -= excess; 1586 } 1587 1588 /* 1589 * Reset frag_len and frag_header for the next fragment. 1590 */ 1591 frag_len = -(int32_t)sizeof (uint32_t); 1592 if (!(frag_header & MIR_LASTFRAG)) { 1593 /* 1594 * The current fragment is complete, but more 1595 * fragments need to be processed before we can 1596 * pass along the RPC message headed at head_mp. 1597 */ 1598 frag_header = 0; 1599 continue; 1600 } 1601 frag_header = 0; 1602 1603 /* 1604 * We've got a complete RPC message; pass it to the 1605 * appropriate consumer. 1606 */ 1607 switch (mir->mir_type) { 1608 case RPC_CLIENT: 1609 if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) { 1610 /* 1611 * Mark this stream as active. This marker 1612 * is used in mir_timer(). 1613 */ 1614 mir->mir_clntreq = 1; 1615 mir->mir_use_timestamp = lbolt; 1616 } else { 1617 freemsg(head_mp); 1618 } 1619 break; 1620 1621 case RPC_SERVER: 1622 /* 1623 * Check for flow control before passing the 1624 * message to KRPC. 1625 */ 1626 if (!mir->mir_hold_inbound) { 1627 if (mir->mir_krpc_cell) { 1628 /* 1629 * If the reference count is 0 1630 * (not including this request), 1631 * then the stream is transitioning 1632 * from idle to non-idle. In this case, 1633 * we cancel the idle timer. 1634 */ 1635 if (mir->mir_ref_cnt++ == 0) 1636 stop_timer = B_TRUE; 1637 if (mir_check_len(q, 1638 (int32_t)msgdsize(mp), mp)) 1639 return; 1640 svc_queuereq(q, head_mp); /* to KRPC */ 1641 } else { 1642 /* 1643 * Count # of times this happens. Should 1644 * be never, but experience shows 1645 * otherwise. 1646 */ 1647 mir_krpc_cell_null++; 1648 freemsg(head_mp); 1649 } 1650 } else { 1651 /* 1652 * If the outbound side of the stream is 1653 * flow controlled, then hold this message 1654 * until client catches up. mir_hold_inbound 1655 * is set in mir_wput and cleared in mir_wsrv. 1656 */ 1657 (void) putq(q, head_mp); 1658 mir->mir_inrservice = B_TRUE; 1659 } 1660 break; 1661 default: 1662 RPCLOG(1, "mir_rput: unknown mir_type %d\n", 1663 mir->mir_type); 1664 freemsg(head_mp); 1665 break; 1666 } 1667 1668 /* 1669 * Reset the chain since we're starting on a new RPC message. 1670 */ 1671 head_mp = tail_mp = NULL; 1672 } while ((mp = cont_mp) != NULL); 1673 1674 /* 1675 * Sanity check the message length; if it's too large mir_check_len() 1676 * will shutdown the connection, drop mir_mutex, and return non-zero. 1677 */ 1678 if (head_mp != NULL && mir->mir_setup_complete && 1679 mir_check_len(q, frag_len, head_mp)) 1680 return; 1681 1682 /* Save our local copies back in the mir structure. */ 1683 mir->mir_frag_header = frag_header; 1684 mir->mir_frag_len = frag_len; 1685 mir->mir_head_mp = head_mp; 1686 mir->mir_tail_mp = tail_mp; 1687 1688 /* 1689 * The timer is stopped after the whole message chain is processed. 1690 * The reason is that stopping the timer releases the mir_mutex 1691 * lock temporarily. This means that the request can be serviced 1692 * while we are still processing the message chain. This is not 1693 * good. So we stop the timer here instead. 1694 * 1695 * Note that if the timer fires before we stop it, it will not 1696 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer() 1697 * will just return. 1698 */ 1699 if (stop_timer) { 1700 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because " 1701 "ref cnt going to non zero\n", (void *)WR(q)); 1702 mir_svc_idle_stop(WR(q), mir); 1703 } 1704 mutex_exit(&mir->mir_mutex); 1705 } 1706 1707 static void 1708 mir_rput_proto(queue_t *q, mblk_t *mp) 1709 { 1710 mir_t *mir = (mir_t *)q->q_ptr; 1711 uint32_t type; 1712 uint32_t reason = 0; 1713 1714 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1715 1716 type = ((union T_primitives *)mp->b_rptr)->type; 1717 switch (mir->mir_type) { 1718 case RPC_CLIENT: 1719 switch (type) { 1720 case T_DISCON_IND: 1721 reason = ((struct T_discon_ind *) 1722 (mp->b_rptr))->DISCON_reason; 1723 /*FALLTHROUGH*/ 1724 case T_ORDREL_IND: 1725 mutex_enter(&mir->mir_mutex); 1726 if (mir->mir_head_mp) { 1727 freemsg(mir->mir_head_mp); 1728 mir->mir_head_mp = (mblk_t *)0; 1729 mir->mir_tail_mp = (mblk_t *)0; 1730 } 1731 /* 1732 * We are disconnecting, but not necessarily 1733 * closing. By not closing, we will fail to 1734 * pick up a possibly changed global timeout value, 1735 * unless we store it now. 1736 */ 1737 mir->mir_idle_timeout = clnt_idle_timeout; 1738 mir_clnt_idle_stop(WR(q), mir); 1739 1740 /* 1741 * Even though we are unconnected, we still 1742 * leave the idle timer going on the client. The 1743 * reason for is that if we've disconnected due 1744 * to a server-side disconnect, reset, or connection 1745 * timeout, there is a possibility the client may 1746 * retry the RPC request. This retry needs to done on 1747 * the same bound address for the server to interpret 1748 * it as such. However, we don't want 1749 * to wait forever for that possibility. If the 1750 * end-point stays unconnected for mir_idle_timeout 1751 * units of time, then that is a signal to the 1752 * connection manager to give up waiting for the 1753 * application (eg. NFS) to send a retry. 1754 */ 1755 mir_clnt_idle_start(WR(q), mir); 1756 mutex_exit(&mir->mir_mutex); 1757 clnt_dispatch_notifyall(WR(q), type, reason); 1758 freemsg(mp); 1759 return; 1760 case T_ERROR_ACK: 1761 { 1762 struct T_error_ack *terror; 1763 1764 terror = (struct T_error_ack *)mp->b_rptr; 1765 RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p", 1766 (void *)q); 1767 RPCLOG(1, " ERROR_prim: %s,", 1768 rpc_tpiprim2name(terror->ERROR_prim)); 1769 RPCLOG(1, " TLI_error: %s,", 1770 rpc_tpierr2name(terror->TLI_error)); 1771 RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error); 1772 if (terror->ERROR_prim == T_DISCON_REQ) { 1773 clnt_dispatch_notifyall(WR(q), type, reason); 1774 freemsg(mp); 1775 return; 1776 } else { 1777 if (clnt_dispatch_notifyconn(WR(q), mp)) 1778 return; 1779 } 1780 break; 1781 } 1782 case T_OK_ACK: 1783 { 1784 struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr; 1785 1786 if (tok->CORRECT_prim == T_DISCON_REQ) { 1787 clnt_dispatch_notifyall(WR(q), type, reason); 1788 freemsg(mp); 1789 return; 1790 } else { 1791 if (clnt_dispatch_notifyconn(WR(q), mp)) 1792 return; 1793 } 1794 break; 1795 } 1796 case T_CONN_CON: 1797 case T_INFO_ACK: 1798 case T_OPTMGMT_ACK: 1799 if (clnt_dispatch_notifyconn(WR(q), mp)) 1800 return; 1801 break; 1802 case T_BIND_ACK: 1803 break; 1804 default: 1805 RPCLOG(1, "mir_rput: unexpected message %d " 1806 "for KRPC client\n", 1807 ((union T_primitives *)mp->b_rptr)->type); 1808 break; 1809 } 1810 break; 1811 1812 case RPC_SERVER: 1813 switch (type) { 1814 case T_BIND_ACK: 1815 { 1816 struct T_bind_ack *tbind; 1817 1818 /* 1819 * If this is a listening stream, then shut 1820 * off the idle timer. 1821 */ 1822 tbind = (struct T_bind_ack *)mp->b_rptr; 1823 if (tbind->CONIND_number > 0) { 1824 mutex_enter(&mir->mir_mutex); 1825 mir_svc_idle_stop(WR(q), mir); 1826 1827 /* 1828 * mark this as a listen endpoint 1829 * for special handling. 1830 */ 1831 1832 mir->mir_listen_stream = 1; 1833 mutex_exit(&mir->mir_mutex); 1834 } 1835 break; 1836 } 1837 case T_DISCON_IND: 1838 case T_ORDREL_IND: 1839 RPCLOG(16, "mir_rput_proto: got %s indication\n", 1840 type == T_DISCON_IND ? "disconnect" 1841 : "orderly release"); 1842 1843 /* 1844 * For listen endpoint just pass 1845 * on the message. 1846 */ 1847 1848 if (mir->mir_listen_stream) 1849 break; 1850 1851 mutex_enter(&mir->mir_mutex); 1852 1853 /* 1854 * If client wants to break off connection, record 1855 * that fact. 1856 */ 1857 mir_svc_start_close(WR(q), mir); 1858 1859 /* 1860 * If we are idle, then send the orderly release 1861 * or disconnect indication to nfsd. 1862 */ 1863 if (MIR_SVC_QUIESCED(mir)) { 1864 mutex_exit(&mir->mir_mutex); 1865 break; 1866 } 1867 1868 RPCLOG(16, "mir_rput_proto: not idle, so " 1869 "disconnect/ord rel indication not passed " 1870 "upstream on 0x%p\n", (void *)q); 1871 1872 /* 1873 * Hold the indication until we get idle 1874 * If there already is an indication stored, 1875 * replace it if the new one is a disconnect. The 1876 * reasoning is that disconnection takes less time 1877 * to process, and once a client decides to 1878 * disconnect, we should do that. 1879 */ 1880 if (mir->mir_svc_pend_mp) { 1881 if (type == T_DISCON_IND) { 1882 RPCLOG(16, "mir_rput_proto: replacing" 1883 " held disconnect/ord rel" 1884 " indication with disconnect on" 1885 " 0x%p\n", (void *)q); 1886 1887 freemsg(mir->mir_svc_pend_mp); 1888 mir->mir_svc_pend_mp = mp; 1889 } else { 1890 RPCLOG(16, "mir_rput_proto: already " 1891 "held a disconnect/ord rel " 1892 "indication. freeing ord rel " 1893 "ind on 0x%p\n", (void *)q); 1894 freemsg(mp); 1895 } 1896 } else 1897 mir->mir_svc_pend_mp = mp; 1898 1899 mutex_exit(&mir->mir_mutex); 1900 return; 1901 1902 default: 1903 /* nfsd handles server-side non-data messages. */ 1904 break; 1905 } 1906 break; 1907 1908 default: 1909 break; 1910 } 1911 1912 putnext(q, mp); 1913 } 1914 1915 /* 1916 * The server-side read queues are used to hold inbound messages while 1917 * outbound flow control is exerted. When outbound flow control is 1918 * relieved, mir_wsrv qenables the read-side queue. Read-side queues 1919 * are not enabled by STREAMS and are explicitly noenable'ed in mir_open. 1920 * 1921 * For the server side, we have two types of messages queued. The first type 1922 * are messages that are ready to be XDR decoded and and then sent to the 1923 * RPC program's dispatch routine. The second type are "raw" messages that 1924 * haven't been processed, i.e. assembled from rpc record fragements into 1925 * full requests. The only time we will see the second type of message 1926 * queued is if we have a memory allocation failure while processing a 1927 * a raw message. The field mir_first_non_processed_mblk will mark the 1928 * first such raw message. So the flow for server side is: 1929 * 1930 * - send processed queued messages to kRPC until we run out or find 1931 * one that needs additional processing because we were short on memory 1932 * earlier 1933 * - process a message that was deferred because of lack of 1934 * memory 1935 * - continue processing messages until the queue empties or we 1936 * have to stop because of lack of memory 1937 * - during each of the above phase, if the queue is empty and 1938 * there are no pending messages that were passed to the RPC 1939 * layer, send upstream the pending disconnect/ordrel indication if 1940 * there is one 1941 * 1942 * The read-side queue is also enabled by a bufcall callback if dupmsg 1943 * fails in mir_rput. 1944 */ 1945 static void 1946 mir_rsrv(queue_t *q) 1947 { 1948 mir_t *mir; 1949 mblk_t *mp; 1950 mblk_t *cmp = NULL; 1951 boolean_t stop_timer = B_FALSE; 1952 1953 mir = (mir_t *)q->q_ptr; 1954 mutex_enter(&mir->mir_mutex); 1955 1956 mp = NULL; 1957 switch (mir->mir_type) { 1958 case RPC_SERVER: 1959 if (mir->mir_ref_cnt == 0) 1960 mir->mir_hold_inbound = 0; 1961 if (mir->mir_hold_inbound) { 1962 1963 ASSERT(cmp == NULL); 1964 if (q->q_first == NULL) { 1965 1966 MIR_CLEAR_INRSRV(mir); 1967 1968 if (MIR_SVC_QUIESCED(mir)) { 1969 cmp = mir->mir_svc_pend_mp; 1970 mir->mir_svc_pend_mp = NULL; 1971 } 1972 } 1973 1974 mutex_exit(&mir->mir_mutex); 1975 1976 if (cmp != NULL) { 1977 RPCLOG(16, "mir_rsrv: line %d: sending a held " 1978 "disconnect/ord rel indication upstream\n", 1979 __LINE__); 1980 putnext(q, cmp); 1981 } 1982 1983 return; 1984 } 1985 while (mp = getq(q)) { 1986 if (mir->mir_krpc_cell && 1987 (mir->mir_svc_no_more_msgs == 0)) { 1988 /* 1989 * If we were idle, turn off idle timer since 1990 * we aren't idle any more. 1991 */ 1992 if (mir->mir_ref_cnt++ == 0) 1993 stop_timer = B_TRUE; 1994 if (mir_check_len(q, 1995 (int32_t)msgdsize(mp), mp)) 1996 return; 1997 svc_queuereq(q, mp); 1998 } else { 1999 /* 2000 * Count # of times this happens. Should be 2001 * never, but experience shows otherwise. 2002 */ 2003 if (mir->mir_krpc_cell == NULL) 2004 mir_krpc_cell_null++; 2005 freemsg(mp); 2006 } 2007 } 2008 break; 2009 case RPC_CLIENT: 2010 break; 2011 default: 2012 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type); 2013 2014 if (q->q_first == NULL) 2015 MIR_CLEAR_INRSRV(mir); 2016 2017 mutex_exit(&mir->mir_mutex); 2018 2019 return; 2020 } 2021 2022 /* 2023 * The timer is stopped after all the messages are processed. 2024 * The reason is that stopping the timer releases the mir_mutex 2025 * lock temporarily. This means that the request can be serviced 2026 * while we are still processing the message queue. This is not 2027 * good. So we stop the timer here instead. 2028 */ 2029 if (stop_timer) { 2030 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref " 2031 "cnt going to non zero\n", (void *)WR(q)); 2032 mir_svc_idle_stop(WR(q), mir); 2033 } 2034 2035 if (q->q_first == NULL) { 2036 2037 MIR_CLEAR_INRSRV(mir); 2038 2039 ASSERT(cmp == NULL); 2040 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) { 2041 cmp = mir->mir_svc_pend_mp; 2042 mir->mir_svc_pend_mp = NULL; 2043 } 2044 2045 mutex_exit(&mir->mir_mutex); 2046 2047 if (cmp != NULL) { 2048 RPCLOG(16, "mir_rsrv: line %d: sending a held " 2049 "disconnect/ord rel indication upstream\n", 2050 __LINE__); 2051 putnext(q, cmp); 2052 } 2053 2054 return; 2055 } 2056 mutex_exit(&mir->mir_mutex); 2057 } 2058 2059 static int mir_svc_policy_fails; 2060 2061 /* 2062 * Called to send an event code to nfsd/lockd so that it initiates 2063 * connection close. 2064 */ 2065 static int 2066 mir_svc_policy_notify(queue_t *q, int event) 2067 { 2068 mblk_t *mp; 2069 #ifdef DEBUG 2070 mir_t *mir = (mir_t *)q->q_ptr; 2071 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2072 #endif 2073 ASSERT(q->q_flag & QREADR); 2074 2075 /* 2076 * Create an M_DATA message with the event code and pass it to the 2077 * Stream head (nfsd or whoever created the stream will consume it). 2078 */ 2079 mp = allocb(sizeof (int), BPRI_HI); 2080 2081 if (!mp) { 2082 2083 mir_svc_policy_fails++; 2084 RPCLOG(16, "mir_svc_policy_notify: could not allocate event " 2085 "%d\n", event); 2086 return (ENOMEM); 2087 } 2088 2089 U32_TO_BE32(event, mp->b_rptr); 2090 mp->b_wptr = mp->b_rptr + sizeof (int); 2091 putnext(q, mp); 2092 return (0); 2093 } 2094 2095 /* 2096 * Server side: start the close phase. We want to get this rpcmod slot in an 2097 * idle state before mir_close() is called. 2098 */ 2099 static void 2100 mir_svc_start_close(queue_t *wq, mir_t *mir) 2101 { 2102 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2103 ASSERT((wq->q_flag & QREADR) == 0); 2104 ASSERT(mir->mir_type == RPC_SERVER); 2105 2106 2107 /* 2108 * Do not accept any more messages. 2109 */ 2110 mir->mir_svc_no_more_msgs = 1; 2111 2112 /* 2113 * Next two statements will make the read service procedure invoke 2114 * svc_queuereq() on everything stuck in the streams read queue. 2115 * It's not necessary because enabling the write queue will 2116 * have the same effect, but why not speed the process along? 2117 */ 2118 mir->mir_hold_inbound = 0; 2119 qenable(RD(wq)); 2120 2121 /* 2122 * Meanwhile force the write service procedure to send the 2123 * responses downstream, regardless of flow control. 2124 */ 2125 qenable(wq); 2126 } 2127 2128 /* 2129 * This routine is called directly by KRPC after a request is completed, 2130 * whether a reply was sent or the request was dropped. 2131 */ 2132 static void 2133 mir_svc_release(queue_t *wq, mblk_t *mp) 2134 { 2135 mir_t *mir = (mir_t *)wq->q_ptr; 2136 mblk_t *cmp = NULL; 2137 2138 ASSERT((wq->q_flag & QREADR) == 0); 2139 if (mp) 2140 freemsg(mp); 2141 2142 mutex_enter(&mir->mir_mutex); 2143 2144 /* 2145 * Start idle processing if this is the last reference. 2146 */ 2147 if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) { 2148 cmp = mir->mir_svc_pend_mp; 2149 mir->mir_svc_pend_mp = NULL; 2150 } 2151 2152 if (cmp) { 2153 RPCLOG(16, "mir_svc_release: sending a held " 2154 "disconnect/ord rel indication upstream on queue 0x%p\n", 2155 (void *)RD(wq)); 2156 2157 mutex_exit(&mir->mir_mutex); 2158 2159 putnext(RD(wq), cmp); 2160 2161 mutex_enter(&mir->mir_mutex); 2162 } 2163 2164 /* 2165 * Start idle processing if this is the last reference. 2166 */ 2167 if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) { 2168 2169 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p " 2170 "because ref cnt is zero\n", (void *) wq); 2171 2172 mir_svc_idle_start(wq, mir); 2173 } 2174 2175 mir->mir_ref_cnt--; 2176 ASSERT(mir->mir_ref_cnt >= 0); 2177 2178 /* 2179 * Wake up the thread waiting to close. 2180 */ 2181 2182 if ((mir->mir_ref_cnt == 0) && mir->mir_closing) 2183 cv_signal(&mir->mir_condvar); 2184 2185 mutex_exit(&mir->mir_mutex); 2186 } 2187 2188 /* 2189 * This routine is called by server-side KRPC when it is ready to 2190 * handle inbound messages on the stream. 2191 */ 2192 static void 2193 mir_svc_start(queue_t *wq) 2194 { 2195 mir_t *mir = (mir_t *)wq->q_ptr; 2196 2197 /* 2198 * no longer need to take the mir_mutex because the 2199 * mir_setup_complete field has been moved out of 2200 * the binary field protected by the mir_mutex. 2201 */ 2202 2203 mir->mir_setup_complete = 1; 2204 qenable(RD(wq)); 2205 } 2206 2207 /* 2208 * client side wrapper for stopping timer with normal idle timeout. 2209 */ 2210 static void 2211 mir_clnt_idle_stop(queue_t *wq, mir_t *mir) 2212 { 2213 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2214 ASSERT((wq->q_flag & QREADR) == 0); 2215 ASSERT(mir->mir_type == RPC_CLIENT); 2216 2217 mir_timer_stop(mir); 2218 } 2219 2220 /* 2221 * client side wrapper for stopping timer with normal idle timeout. 2222 */ 2223 static void 2224 mir_clnt_idle_start(queue_t *wq, mir_t *mir) 2225 { 2226 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2227 ASSERT((wq->q_flag & QREADR) == 0); 2228 ASSERT(mir->mir_type == RPC_CLIENT); 2229 2230 mir_timer_start(wq, mir, mir->mir_idle_timeout); 2231 } 2232 2233 /* 2234 * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on 2235 * end-points that aren't connected. 2236 */ 2237 static void 2238 mir_clnt_idle_do_stop(queue_t *wq) 2239 { 2240 mir_t *mir = (mir_t *)wq->q_ptr; 2241 2242 RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq); 2243 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2244 mutex_enter(&mir->mir_mutex); 2245 mir_clnt_idle_stop(wq, mir); 2246 mutex_exit(&mir->mir_mutex); 2247 } 2248 2249 /* 2250 * Timer handler. It handles idle timeout and memory shortage problem. 2251 */ 2252 static void 2253 mir_timer(void *arg) 2254 { 2255 queue_t *wq = (queue_t *)arg; 2256 mir_t *mir = (mir_t *)wq->q_ptr; 2257 boolean_t notify; 2258 2259 mutex_enter(&mir->mir_mutex); 2260 2261 /* 2262 * mir_timer_call is set only when either mir_timer_[start|stop] 2263 * is progressing. And mir_timer() can only be run while they 2264 * are progressing if the timer is being stopped. So just 2265 * return. 2266 */ 2267 if (mir->mir_timer_call) { 2268 mutex_exit(&mir->mir_mutex); 2269 return; 2270 } 2271 mir->mir_timer_id = 0; 2272 2273 switch (mir->mir_type) { 2274 case RPC_CLIENT: 2275 2276 /* 2277 * For clients, the timer fires at clnt_idle_timeout 2278 * intervals. If the activity marker (mir_clntreq) is 2279 * zero, then the stream has been idle since the last 2280 * timer event and we notify KRPC. If mir_clntreq is 2281 * non-zero, then the stream is active and we just 2282 * restart the timer for another interval. mir_clntreq 2283 * is set to 1 in mir_wput for every request passed 2284 * downstream. 2285 * 2286 * If this was a memory shortage timer reset the idle 2287 * timeout regardless; the mir_clntreq will not be a 2288 * valid indicator. 2289 * 2290 * The timer is initially started in mir_wput during 2291 * RPC_CLIENT ioctl processing. 2292 * 2293 * The timer interval can be changed for individual 2294 * streams with the ND variable "mir_idle_timeout". 2295 */ 2296 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp + 2297 MSEC_TO_TICK(mir->mir_idle_timeout) - lbolt >= 0) { 2298 clock_t tout; 2299 2300 tout = mir->mir_idle_timeout - 2301 TICK_TO_MSEC(lbolt - mir->mir_use_timestamp); 2302 if (tout < 0) 2303 tout = 1000; 2304 #if 0 2305 printf("mir_timer[%d < %d + %d]: reset client timer " 2306 "to %d (ms)\n", TICK_TO_MSEC(lbolt), 2307 TICK_TO_MSEC(mir->mir_use_timestamp), 2308 mir->mir_idle_timeout, tout); 2309 #endif 2310 mir->mir_clntreq = 0; 2311 mir_timer_start(wq, mir, tout); 2312 mutex_exit(&mir->mir_mutex); 2313 return; 2314 } 2315 #if 0 2316 printf("mir_timer[%d]: doing client timeout\n", lbolt / hz); 2317 #endif 2318 /* 2319 * We are disconnecting, but not necessarily 2320 * closing. By not closing, we will fail to 2321 * pick up a possibly changed global timeout value, 2322 * unless we store it now. 2323 */ 2324 mir->mir_idle_timeout = clnt_idle_timeout; 2325 mir_clnt_idle_start(wq, mir); 2326 2327 mutex_exit(&mir->mir_mutex); 2328 /* 2329 * We pass T_ORDREL_REQ as an integer value 2330 * to KRPC as the indication that the stream 2331 * is idle. This is not a T_ORDREL_REQ message, 2332 * it is just a convenient value since we call 2333 * the same KRPC routine for T_ORDREL_INDs and 2334 * T_DISCON_INDs. 2335 */ 2336 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0); 2337 return; 2338 2339 case RPC_SERVER: 2340 2341 /* 2342 * For servers, the timer is only running when the stream 2343 * is really idle or memory is short. The timer is started 2344 * by mir_wput when mir_type is set to RPC_SERVER and 2345 * by mir_svc_idle_start whenever the stream goes idle 2346 * (mir_ref_cnt == 0). The timer is cancelled in 2347 * mir_rput whenever a new inbound request is passed to KRPC 2348 * and the stream was previously idle. 2349 * 2350 * The timer interval can be changed for individual 2351 * streams with the ND variable "mir_idle_timeout". 2352 * 2353 * If the stream is not idle do nothing. 2354 */ 2355 if (!MIR_SVC_QUIESCED(mir)) { 2356 mutex_exit(&mir->mir_mutex); 2357 return; 2358 } 2359 2360 notify = !mir->mir_inrservice; 2361 mutex_exit(&mir->mir_mutex); 2362 2363 /* 2364 * If there is no packet queued up in read queue, the stream 2365 * is really idle so notify nfsd to close it. 2366 */ 2367 if (notify) { 2368 RPCLOG(16, "mir_timer: telling stream head listener " 2369 "to close stream (0x%p)\n", (void *) RD(wq)); 2370 (void) mir_svc_policy_notify(RD(wq), 1); 2371 } 2372 return; 2373 default: 2374 RPCLOG(1, "mir_timer: unexpected mir_type %d\n", 2375 mir->mir_type); 2376 mutex_exit(&mir->mir_mutex); 2377 return; 2378 } 2379 } 2380 2381 /* 2382 * Called by the RPC package to send either a call or a return, or a 2383 * transport connection request. Adds the record marking header. 2384 */ 2385 static void 2386 mir_wput(queue_t *q, mblk_t *mp) 2387 { 2388 uint_t frag_header; 2389 mir_t *mir = (mir_t *)q->q_ptr; 2390 uchar_t *rptr = mp->b_rptr; 2391 2392 if (!mir) { 2393 freemsg(mp); 2394 return; 2395 } 2396 2397 if (mp->b_datap->db_type != M_DATA) { 2398 mir_wput_other(q, mp); 2399 return; 2400 } 2401 2402 if (mir->mir_ordrel_pending == 1) { 2403 freemsg(mp); 2404 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n", 2405 (void *)q); 2406 return; 2407 } 2408 2409 frag_header = (uint_t)DLEN(mp); 2410 frag_header |= MIR_LASTFRAG; 2411 2412 /* Stick in the 4 byte record marking header. */ 2413 if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) || 2414 !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) { 2415 /* 2416 * Since we know that M_DATA messages are created exclusively 2417 * by KRPC, we expect that KRPC will leave room for our header 2418 * and 4 byte align which is normal for XDR. 2419 * If KRPC (or someone else) does not cooperate, then we 2420 * just throw away the message. 2421 */ 2422 RPCLOG(1, "mir_wput: KRPC did not leave space for record " 2423 "fragment header (%d bytes left)\n", 2424 (int)(rptr - mp->b_datap->db_base)); 2425 freemsg(mp); 2426 return; 2427 } 2428 rptr -= sizeof (uint32_t); 2429 *(uint32_t *)rptr = htonl(frag_header); 2430 mp->b_rptr = rptr; 2431 2432 mutex_enter(&mir->mir_mutex); 2433 if (mir->mir_type == RPC_CLIENT) { 2434 /* 2435 * For the client, set mir_clntreq to indicate that the 2436 * connection is active. 2437 */ 2438 mir->mir_clntreq = 1; 2439 mir->mir_use_timestamp = lbolt; 2440 } 2441 2442 /* 2443 * If we haven't already queued some data and the downstream module 2444 * can accept more data, send it on, otherwise we queue the message 2445 * and take other actions depending on mir_type. 2446 */ 2447 if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) { 2448 mutex_exit(&mir->mir_mutex); 2449 2450 /* 2451 * Now we pass the RPC message downstream. 2452 */ 2453 putnext(q, mp); 2454 return; 2455 } 2456 2457 switch (mir->mir_type) { 2458 case RPC_CLIENT: 2459 /* 2460 * Check for a previous duplicate request on the 2461 * queue. If there is one, then we throw away 2462 * the current message and let the previous one 2463 * go through. If we can't find a duplicate, then 2464 * send this one. This tap dance is an effort 2465 * to reduce traffic and processing requirements 2466 * under load conditions. 2467 */ 2468 if (mir_clnt_dup_request(q, mp)) { 2469 mutex_exit(&mir->mir_mutex); 2470 freemsg(mp); 2471 return; 2472 } 2473 break; 2474 case RPC_SERVER: 2475 /* 2476 * Set mir_hold_inbound so that new inbound RPC 2477 * messages will be held until the client catches 2478 * up on the earlier replies. This flag is cleared 2479 * in mir_wsrv after flow control is relieved; 2480 * the read-side queue is also enabled at that time. 2481 */ 2482 mir->mir_hold_inbound = 1; 2483 break; 2484 default: 2485 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type); 2486 break; 2487 } 2488 mir->mir_inwservice = 1; 2489 (void) putq(q, mp); 2490 mutex_exit(&mir->mir_mutex); 2491 } 2492 2493 static void 2494 mir_wput_other(queue_t *q, mblk_t *mp) 2495 { 2496 mir_t *mir = (mir_t *)q->q_ptr; 2497 struct iocblk *iocp; 2498 uchar_t *rptr = mp->b_rptr; 2499 bool_t flush_in_svc = FALSE; 2500 2501 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2502 switch (mp->b_datap->db_type) { 2503 case M_IOCTL: 2504 iocp = (struct iocblk *)rptr; 2505 switch (iocp->ioc_cmd) { 2506 case RPC_CLIENT: 2507 mutex_enter(&mir->mir_mutex); 2508 if (mir->mir_type != 0 && 2509 mir->mir_type != iocp->ioc_cmd) { 2510 ioc_eperm: 2511 mutex_exit(&mir->mir_mutex); 2512 iocp->ioc_error = EPERM; 2513 iocp->ioc_count = 0; 2514 mp->b_datap->db_type = M_IOCACK; 2515 qreply(q, mp); 2516 return; 2517 } 2518 2519 mir->mir_type = iocp->ioc_cmd; 2520 2521 /* 2522 * Clear mir_hold_inbound which was set to 1 by 2523 * mir_open. This flag is not used on client 2524 * streams. 2525 */ 2526 mir->mir_hold_inbound = 0; 2527 mir->mir_max_msg_sizep = &clnt_max_msg_size; 2528 2529 /* 2530 * Start the idle timer. See mir_timer() for more 2531 * information on how client timers work. 2532 */ 2533 mir->mir_idle_timeout = clnt_idle_timeout; 2534 mir_clnt_idle_start(q, mir); 2535 mutex_exit(&mir->mir_mutex); 2536 2537 mp->b_datap->db_type = M_IOCACK; 2538 qreply(q, mp); 2539 return; 2540 case RPC_SERVER: 2541 mutex_enter(&mir->mir_mutex); 2542 if (mir->mir_type != 0 && 2543 mir->mir_type != iocp->ioc_cmd) 2544 goto ioc_eperm; 2545 2546 /* 2547 * We don't clear mir_hold_inbound here because 2548 * mir_hold_inbound is used in the flow control 2549 * model. If we cleared it here, then we'd commit 2550 * a small violation to the model where the transport 2551 * might immediately block downstream flow. 2552 */ 2553 2554 mir->mir_type = iocp->ioc_cmd; 2555 mir->mir_max_msg_sizep = &svc_max_msg_size; 2556 2557 /* 2558 * Start the idle timer. See mir_timer() for more 2559 * information on how server timers work. 2560 * 2561 * Note that it is important to start the idle timer 2562 * here so that connections time out even if we 2563 * never receive any data on them. 2564 */ 2565 mir->mir_idle_timeout = svc_idle_timeout; 2566 RPCLOG(16, "mir_wput_other starting idle timer on 0x%p " 2567 "because we got RPC_SERVER ioctl\n", (void *)q); 2568 mir_svc_idle_start(q, mir); 2569 mutex_exit(&mir->mir_mutex); 2570 2571 mp->b_datap->db_type = M_IOCACK; 2572 qreply(q, mp); 2573 return; 2574 default: 2575 break; 2576 } 2577 break; 2578 2579 case M_PROTO: 2580 if (mir->mir_type == RPC_CLIENT) { 2581 /* 2582 * We are likely being called from the context of a 2583 * service procedure. So we need to enqueue. However 2584 * enqueing may put our message behind data messages. 2585 * So flush the data first. 2586 */ 2587 flush_in_svc = TRUE; 2588 } 2589 if ((mp->b_wptr - rptr) < sizeof (uint32_t) || 2590 !IS_P2ALIGNED(rptr, sizeof (uint32_t))) 2591 break; 2592 2593 switch (((union T_primitives *)rptr)->type) { 2594 case T_DATA_REQ: 2595 /* Don't pass T_DATA_REQ messages downstream. */ 2596 freemsg(mp); 2597 return; 2598 case T_ORDREL_REQ: 2599 RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n", 2600 (void *)q); 2601 mutex_enter(&mir->mir_mutex); 2602 if (mir->mir_type != RPC_SERVER) { 2603 /* 2604 * We are likely being called from 2605 * clnt_dispatch_notifyall(). Sending 2606 * a T_ORDREL_REQ will result in 2607 * a some kind of _IND message being sent, 2608 * will be another call to 2609 * clnt_dispatch_notifyall(). To keep the stack 2610 * lean, queue this message. 2611 */ 2612 mir->mir_inwservice = 1; 2613 (void) putq(q, mp); 2614 mutex_exit(&mir->mir_mutex); 2615 return; 2616 } 2617 2618 /* 2619 * Mark the structure such that we don't accept any 2620 * more requests from client. We could defer this 2621 * until we actually send the orderly release 2622 * request downstream, but all that does is delay 2623 * the closing of this stream. 2624 */ 2625 RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ " 2626 " so calling mir_svc_start_close\n", (void *)q); 2627 2628 mir_svc_start_close(q, mir); 2629 2630 /* 2631 * If we have sent down a T_ORDREL_REQ, don't send 2632 * any more. 2633 */ 2634 if (mir->mir_ordrel_pending) { 2635 freemsg(mp); 2636 mutex_exit(&mir->mir_mutex); 2637 return; 2638 } 2639 2640 /* 2641 * If the stream is not idle, then we hold the 2642 * orderly release until it becomes idle. This 2643 * ensures that KRPC will be able to reply to 2644 * all requests that we have passed to it. 2645 * 2646 * We also queue the request if there is data already 2647 * queued, because we cannot allow the T_ORDREL_REQ 2648 * to go before data. When we had a separate reply 2649 * count, this was not a problem, because the 2650 * reply count was reconciled when mir_wsrv() 2651 * completed. 2652 */ 2653 if (!MIR_SVC_QUIESCED(mir) || 2654 mir->mir_inwservice == 1) { 2655 mir->mir_inwservice = 1; 2656 (void) putq(q, mp); 2657 2658 RPCLOG(16, "mir_wput_other: queuing " 2659 "T_ORDREL_REQ on 0x%p\n", (void *)q); 2660 2661 mutex_exit(&mir->mir_mutex); 2662 return; 2663 } 2664 2665 /* 2666 * Mark the structure so that we know we sent 2667 * an orderly release request, and reset the idle timer. 2668 */ 2669 mir->mir_ordrel_pending = 1; 2670 2671 RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start" 2672 " on 0x%p because we got T_ORDREL_REQ\n", 2673 (void *)q); 2674 2675 mir_svc_idle_start(q, mir); 2676 mutex_exit(&mir->mir_mutex); 2677 2678 /* 2679 * When we break, we will putnext the T_ORDREL_REQ. 2680 */ 2681 break; 2682 2683 case T_CONN_REQ: 2684 mutex_enter(&mir->mir_mutex); 2685 if (mir->mir_head_mp != NULL) { 2686 freemsg(mir->mir_head_mp); 2687 mir->mir_head_mp = NULL; 2688 mir->mir_tail_mp = NULL; 2689 } 2690 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 2691 /* 2692 * Restart timer in case mir_clnt_idle_do_stop() was 2693 * called. 2694 */ 2695 mir->mir_idle_timeout = clnt_idle_timeout; 2696 mir_clnt_idle_stop(q, mir); 2697 mir_clnt_idle_start(q, mir); 2698 mutex_exit(&mir->mir_mutex); 2699 break; 2700 2701 default: 2702 /* 2703 * T_DISCON_REQ is one of the interesting default 2704 * cases here. Ideally, an M_FLUSH is done before 2705 * T_DISCON_REQ is done. However, that is somewhat 2706 * cumbersome for clnt_cots.c to do. So we queue 2707 * T_DISCON_REQ, and let the service procedure 2708 * flush all M_DATA. 2709 */ 2710 break; 2711 } 2712 /* fallthru */; 2713 default: 2714 if (mp->b_datap->db_type >= QPCTL) { 2715 if (mp->b_datap->db_type == M_FLUSH) { 2716 if (mir->mir_type == RPC_CLIENT && 2717 *mp->b_rptr & FLUSHW) { 2718 RPCLOG(32, "mir_wput_other: flushing " 2719 "wq 0x%p\n", (void *)q); 2720 if (*mp->b_rptr & FLUSHBAND) { 2721 flushband(q, *(mp->b_rptr + 1), 2722 FLUSHDATA); 2723 } else { 2724 flushq(q, FLUSHDATA); 2725 } 2726 } else { 2727 RPCLOG(32, "mir_wput_other: ignoring " 2728 "M_FLUSH on wq 0x%p\n", (void *)q); 2729 } 2730 } 2731 break; 2732 } 2733 2734 mutex_enter(&mir->mir_mutex); 2735 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) { 2736 mutex_exit(&mir->mir_mutex); 2737 break; 2738 } 2739 mir->mir_inwservice = 1; 2740 mir->mir_inwflushdata = flush_in_svc; 2741 (void) putq(q, mp); 2742 mutex_exit(&mir->mir_mutex); 2743 qenable(q); 2744 2745 return; 2746 } 2747 putnext(q, mp); 2748 } 2749 2750 static void 2751 mir_wsrv(queue_t *q) 2752 { 2753 mblk_t *mp; 2754 mir_t *mir; 2755 bool_t flushdata; 2756 2757 mir = (mir_t *)q->q_ptr; 2758 mutex_enter(&mir->mir_mutex); 2759 2760 flushdata = mir->mir_inwflushdata; 2761 mir->mir_inwflushdata = 0; 2762 2763 while (mp = getq(q)) { 2764 if (mp->b_datap->db_type == M_DATA) { 2765 /* 2766 * Do not send any more data if we have sent 2767 * a T_ORDREL_REQ. 2768 */ 2769 if (flushdata || mir->mir_ordrel_pending == 1) { 2770 freemsg(mp); 2771 continue; 2772 } 2773 2774 /* 2775 * Make sure that the stream can really handle more 2776 * data. 2777 */ 2778 if (!MIR_WCANPUTNEXT(mir, q)) { 2779 (void) putbq(q, mp); 2780 mutex_exit(&mir->mir_mutex); 2781 return; 2782 } 2783 2784 /* 2785 * Now we pass the RPC message downstream. 2786 */ 2787 mutex_exit(&mir->mir_mutex); 2788 putnext(q, mp); 2789 mutex_enter(&mir->mir_mutex); 2790 continue; 2791 } 2792 2793 /* 2794 * This is not an RPC message, pass it downstream 2795 * (ignoring flow control) if the server side is not sending a 2796 * T_ORDREL_REQ downstream. 2797 */ 2798 if (mir->mir_type != RPC_SERVER || 2799 ((union T_primitives *)mp->b_rptr)->type != 2800 T_ORDREL_REQ) { 2801 mutex_exit(&mir->mir_mutex); 2802 putnext(q, mp); 2803 mutex_enter(&mir->mir_mutex); 2804 continue; 2805 } 2806 2807 if (mir->mir_ordrel_pending == 1) { 2808 /* 2809 * Don't send two T_ORDRELs 2810 */ 2811 freemsg(mp); 2812 continue; 2813 } 2814 2815 /* 2816 * Mark the structure so that we know we sent an orderly 2817 * release request. We will check to see slot is idle at the 2818 * end of this routine, and if so, reset the idle timer to 2819 * handle orderly release timeouts. 2820 */ 2821 mir->mir_ordrel_pending = 1; 2822 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n", 2823 (void *)q); 2824 /* 2825 * Send the orderly release downstream. If there are other 2826 * pending replies we won't be able to send them. However, 2827 * the only reason we should send the orderly release is if 2828 * we were idle, or if an unusual event occurred. 2829 */ 2830 mutex_exit(&mir->mir_mutex); 2831 putnext(q, mp); 2832 mutex_enter(&mir->mir_mutex); 2833 } 2834 2835 if (q->q_first == NULL) 2836 /* 2837 * If we call mir_svc_idle_start() below, then 2838 * clearing mir_inwservice here will also result in 2839 * any thread waiting in mir_close() to be signaled. 2840 */ 2841 mir->mir_inwservice = 0; 2842 2843 if (mir->mir_type != RPC_SERVER) { 2844 mutex_exit(&mir->mir_mutex); 2845 return; 2846 } 2847 2848 /* 2849 * If idle we call mir_svc_idle_start to start the timer (or wakeup 2850 * a close). Also make sure not to start the idle timer on the 2851 * listener stream. This can cause nfsd to send an orderly release 2852 * command on the listener stream. 2853 */ 2854 if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) { 2855 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p " 2856 "because mir slot is idle\n", (void *)q); 2857 mir_svc_idle_start(q, mir); 2858 } 2859 2860 /* 2861 * If outbound flow control has been relieved, then allow new 2862 * inbound requests to be processed. 2863 */ 2864 if (mir->mir_hold_inbound) { 2865 mir->mir_hold_inbound = 0; 2866 qenable(RD(q)); 2867 } 2868 mutex_exit(&mir->mir_mutex); 2869 } 2870 2871 static void 2872 mir_disconnect(queue_t *q, mir_t *mir) 2873 { 2874 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2875 2876 switch (mir->mir_type) { 2877 case RPC_CLIENT: 2878 /* 2879 * We are disconnecting, but not necessarily 2880 * closing. By not closing, we will fail to 2881 * pick up a possibly changed global timeout value, 2882 * unless we store it now. 2883 */ 2884 mir->mir_idle_timeout = clnt_idle_timeout; 2885 mir_clnt_idle_start(WR(q), mir); 2886 mutex_exit(&mir->mir_mutex); 2887 2888 /* 2889 * T_DISCON_REQ is passed to KRPC as an integer value 2890 * (this is not a TPI message). It is used as a 2891 * convenient value to indicate a sanity check 2892 * failure -- the same KRPC routine is also called 2893 * for T_DISCON_INDs and T_ORDREL_INDs. 2894 */ 2895 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0); 2896 break; 2897 2898 case RPC_SERVER: 2899 mir->mir_svc_no_more_msgs = 1; 2900 mir_svc_idle_stop(WR(q), mir); 2901 mutex_exit(&mir->mir_mutex); 2902 RPCLOG(16, "mir_disconnect: telling " 2903 "stream head listener to disconnect stream " 2904 "(0x%p)\n", (void *) q); 2905 (void) mir_svc_policy_notify(q, 2); 2906 break; 2907 2908 default: 2909 mutex_exit(&mir->mir_mutex); 2910 break; 2911 } 2912 } 2913 2914 /* 2915 * Sanity check the message length, and if it's too large, shutdown the 2916 * connection. Returns 1 if the connection is shutdown; 0 otherwise. 2917 */ 2918 static int 2919 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp) 2920 { 2921 mir_t *mir = q->q_ptr; 2922 uint_t maxsize = 0; 2923 2924 if (mir->mir_max_msg_sizep != NULL) 2925 maxsize = *mir->mir_max_msg_sizep; 2926 2927 if (maxsize == 0 || frag_len <= (int)maxsize) 2928 return (0); 2929 2930 freemsg(head_mp); 2931 mir->mir_head_mp = NULL; 2932 mir->mir_tail_mp = NULL; 2933 mir->mir_frag_header = 0; 2934 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 2935 if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) { 2936 cmn_err(CE_NOTE, 2937 "KRPC: record fragment from %s of size(%d) exceeds " 2938 "maximum (%u). Disconnecting", 2939 (mir->mir_type == RPC_CLIENT) ? "server" : 2940 (mir->mir_type == RPC_SERVER) ? "client" : 2941 "test tool", frag_len, maxsize); 2942 } 2943 2944 mir_disconnect(q, mir); 2945 return (1); 2946 } 2947