1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2012 Milan Jurik. All rights reserved. 27 * Copyright 2012 Marcel Telka <marcel@telka.sk> 28 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 29 * Copyright 2018 OmniOS Community Edition (OmniOSce) Association. 30 */ 31 /* Copyright (c) 1990 Mentat Inc. */ 32 33 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 34 /* All Rights Reserved */ 35 36 /* 37 * Kernel RPC filtering module 38 */ 39 40 #include <sys/param.h> 41 #include <sys/types.h> 42 #include <sys/stream.h> 43 #include <sys/stropts.h> 44 #include <sys/strsubr.h> 45 #include <sys/tihdr.h> 46 #include <sys/timod.h> 47 #include <sys/tiuser.h> 48 #include <sys/debug.h> 49 #include <sys/signal.h> 50 #include <sys/pcb.h> 51 #include <sys/user.h> 52 #include <sys/errno.h> 53 #include <sys/cred.h> 54 #include <sys/policy.h> 55 #include <sys/inline.h> 56 #include <sys/cmn_err.h> 57 #include <sys/kmem.h> 58 #include <sys/file.h> 59 #include <sys/sysmacros.h> 60 #include <sys/systm.h> 61 #include <sys/t_lock.h> 62 #include <sys/ddi.h> 63 #include <sys/vtrace.h> 64 #include <sys/callb.h> 65 #include <sys/strsun.h> 66 67 #include <sys/strlog.h> 68 #include <rpc/rpc_com.h> 69 #include <inet/common.h> 70 #include <rpc/types.h> 71 #include <sys/time.h> 72 #include <rpc/xdr.h> 73 #include <rpc/auth.h> 74 #include <rpc/clnt.h> 75 #include <rpc/rpc_msg.h> 76 #include <rpc/clnt.h> 77 #include <rpc/svc.h> 78 #include <rpc/rpcsys.h> 79 #include <rpc/rpc_rdma.h> 80 81 /* 82 * This is the loadable module wrapper. 83 */ 84 #include <sys/conf.h> 85 #include <sys/modctl.h> 86 #include <sys/syscall.h> 87 88 extern struct streamtab rpcinfo; 89 90 static struct fmodsw fsw = { 91 "rpcmod", 92 &rpcinfo, 93 D_NEW|D_MP, 94 }; 95 96 /* 97 * Module linkage information for the kernel. 98 */ 99 100 static struct modlstrmod modlstrmod = { 101 &mod_strmodops, "rpc interface str mod", &fsw 102 }; 103 104 /* 105 * For the RPC system call. 106 */ 107 static struct sysent rpcsysent = { 108 2, 109 SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD, 110 rpcsys 111 }; 112 113 static struct modlsys modlsys = { 114 &mod_syscallops, 115 "RPC syscall", 116 &rpcsysent 117 }; 118 119 #ifdef _SYSCALL32_IMPL 120 static struct modlsys modlsys32 = { 121 &mod_syscallops32, 122 "32-bit RPC syscall", 123 &rpcsysent 124 }; 125 #endif /* _SYSCALL32_IMPL */ 126 127 static struct modlinkage modlinkage = { 128 MODREV_1, 129 { 130 &modlsys, 131 #ifdef _SYSCALL32_IMPL 132 &modlsys32, 133 #endif 134 &modlstrmod, 135 NULL 136 } 137 }; 138 139 int 140 _init(void) 141 { 142 int error = 0; 143 callb_id_t cid; 144 int status; 145 146 svc_init(); 147 clnt_init(); 148 cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc"); 149 150 if (error = mod_install(&modlinkage)) { 151 /* 152 * Could not install module, cleanup previous 153 * initialization work. 154 */ 155 clnt_fini(); 156 if (cid != NULL) 157 (void) callb_delete(cid); 158 159 return (error); 160 } 161 162 /* 163 * Load up the RDMA plugins and initialize the stats. Even if the 164 * plugins loadup fails, but rpcmod was successfully installed the 165 * counters still get initialized. 166 */ 167 rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL); 168 mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL); 169 170 cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL); 171 mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL); 172 173 mt_kstat_init(); 174 175 /* 176 * Get our identification into ldi. This is used for loading 177 * other modules, e.g. rpcib. 178 */ 179 status = ldi_ident_from_mod(&modlinkage, &rpcmod_li); 180 if (status != 0) { 181 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status); 182 rpcmod_li = NULL; 183 } 184 185 return (error); 186 } 187 188 /* 189 * The unload entry point fails, because we advertise entry points into 190 * rpcmod from the rest of kRPC: rpcmod_release(). 191 */ 192 int 193 _fini(void) 194 { 195 return (EBUSY); 196 } 197 198 int 199 _info(struct modinfo *modinfop) 200 { 201 return (mod_info(&modlinkage, modinfop)); 202 } 203 204 extern int nulldev(); 205 206 #define RPCMOD_ID 2049 207 208 int rmm_open(queue_t *, dev_t *, int, int, cred_t *); 209 int rmm_close(queue_t *, int, cred_t *); 210 211 /* 212 * To save instructions, since STREAMS ignores the return value 213 * from these functions, they are defined as void here. Kind of icky, but... 214 */ 215 void rmm_rput(queue_t *, mblk_t *); 216 void rmm_wput(queue_t *, mblk_t *); 217 void rmm_rsrv(queue_t *); 218 void rmm_wsrv(queue_t *); 219 220 int rpcmodopen(queue_t *, dev_t *, int, int, cred_t *); 221 int rpcmodclose(queue_t *, int, cred_t *); 222 void rpcmodrput(queue_t *, mblk_t *); 223 void rpcmodwput(queue_t *, mblk_t *); 224 void rpcmodrsrv(); 225 void rpcmodwsrv(queue_t *); 226 227 static void rpcmodwput_other(queue_t *, mblk_t *); 228 static int mir_close(queue_t *q); 229 static int mir_open(queue_t *q, dev_t *devp, int flag, int sflag, 230 cred_t *credp); 231 static void mir_rput(queue_t *q, mblk_t *mp); 232 static void mir_rsrv(queue_t *q); 233 static void mir_wput(queue_t *q, mblk_t *mp); 234 static void mir_wsrv(queue_t *q); 235 236 static struct module_info rpcmod_info = 237 {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024}; 238 239 static struct qinit rpcmodrinit = { 240 (int (*)())rmm_rput, 241 (int (*)())rmm_rsrv, 242 rmm_open, 243 rmm_close, 244 nulldev, 245 &rpcmod_info, 246 NULL 247 }; 248 249 /* 250 * The write put procedure is simply putnext to conserve stack space. 251 * The write service procedure is not used to queue data, but instead to 252 * synchronize with flow control. 253 */ 254 static struct qinit rpcmodwinit = { 255 (int (*)())rmm_wput, 256 (int (*)())rmm_wsrv, 257 rmm_open, 258 rmm_close, 259 nulldev, 260 &rpcmod_info, 261 NULL 262 }; 263 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL }; 264 265 struct xprt_style_ops { 266 int (*xo_open)(); 267 int (*xo_close)(); 268 void (*xo_wput)(); 269 void (*xo_wsrv)(); 270 void (*xo_rput)(); 271 void (*xo_rsrv)(); 272 }; 273 274 /* 275 * Read side has no service procedure. 276 */ 277 static struct xprt_style_ops xprt_clts_ops = { 278 rpcmodopen, 279 rpcmodclose, 280 rpcmodwput, 281 rpcmodwsrv, 282 rpcmodrput, 283 NULL 284 }; 285 286 static struct xprt_style_ops xprt_cots_ops = { 287 mir_open, 288 mir_close, 289 mir_wput, 290 mir_wsrv, 291 mir_rput, 292 mir_rsrv 293 }; 294 295 /* 296 * Per rpcmod "slot" data structure. q->q_ptr points to one of these. 297 */ 298 struct rpcm { 299 void *rm_krpc_cell; /* Reserved for use by kRPC */ 300 struct xprt_style_ops *rm_ops; 301 int rm_type; /* Client or server side stream */ 302 #define RM_CLOSING 0x1 /* somebody is trying to close slot */ 303 uint_t rm_state; /* state of the slot. see above */ 304 uint_t rm_ref; /* cnt of external references to slot */ 305 kmutex_t rm_lock; /* mutex protecting above fields */ 306 kcondvar_t rm_cwait; /* condition for closing */ 307 zoneid_t rm_zoneid; /* zone which pushed rpcmod */ 308 }; 309 310 struct temp_slot { 311 void *cell; 312 struct xprt_style_ops *ops; 313 int type; 314 mblk_t *info_ack; 315 kmutex_t lock; 316 kcondvar_t wait; 317 }; 318 319 typedef struct mir_s { 320 void *mir_krpc_cell; /* Reserved for kRPC use. This field */ 321 /* must be first in the structure. */ 322 struct xprt_style_ops *rm_ops; 323 int mir_type; /* Client or server side stream */ 324 325 mblk_t *mir_head_mp; /* RPC msg in progress */ 326 /* 327 * mir_head_mp points the first mblk being collected in 328 * the current RPC message. Record headers are removed 329 * before data is linked into mir_head_mp. 330 */ 331 mblk_t *mir_tail_mp; /* Last mblk in mir_head_mp */ 332 /* 333 * mir_tail_mp points to the last mblk in the message 334 * chain starting at mir_head_mp. It is only valid 335 * if mir_head_mp is non-NULL and is used to add new 336 * data blocks to the end of chain quickly. 337 */ 338 339 int32_t mir_frag_len; /* Bytes seen in the current frag */ 340 /* 341 * mir_frag_len starts at -4 for beginning of each fragment. 342 * When this length is negative, it indicates the number of 343 * bytes that rpcmod needs to complete the record marker 344 * header. When it is positive or zero, it holds the number 345 * of bytes that have arrived for the current fragment and 346 * are held in mir_header_mp. 347 */ 348 349 int32_t mir_frag_header; 350 /* 351 * Fragment header as collected for the current fragment. 352 * It holds the last-fragment indicator and the number 353 * of bytes in the fragment. 354 */ 355 356 unsigned int 357 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */ 358 mir_hold_inbound : 1, /* Hold inbound messages on server */ 359 /* side until outbound flow control */ 360 /* is relieved. */ 361 mir_closing : 1, /* The stream is being closed */ 362 mir_inrservice : 1, /* data queued or rd srv proc running */ 363 mir_inwservice : 1, /* data queued or wr srv proc running */ 364 mir_inwflushdata : 1, /* flush M_DATAs when srv runs */ 365 /* 366 * On client streams, mir_clntreq is 0 or 1; it is set 367 * to 1 whenever a new request is sent out (mir_wput) 368 * and cleared when the timer fires (mir_timer). If 369 * the timer fires with this value equal to 0, then the 370 * stream is considered idle and kRPC is notified. 371 */ 372 mir_clntreq : 1, 373 /* 374 * On server streams, stop accepting messages 375 */ 376 mir_svc_no_more_msgs : 1, 377 mir_listen_stream : 1, /* listen end point */ 378 mir_unused : 1, /* no longer used */ 379 mir_timer_call : 1, 380 mir_junk_fill_thru_bit_31 : 21; 381 382 int mir_setup_complete; /* server has initialized everything */ 383 timeout_id_t mir_timer_id; /* Timer for idle checks */ 384 clock_t mir_idle_timeout; /* Allowed idle time before shutdown */ 385 /* 386 * This value is copied from clnt_idle_timeout or 387 * svc_idle_timeout during the appropriate ioctl. 388 * Kept in milliseconds 389 */ 390 clock_t mir_use_timestamp; /* updated on client with each use */ 391 /* 392 * This value is set to lbolt 393 * every time a client stream sends or receives data. 394 * Even if the timer message arrives, we don't shutdown 395 * client unless: 396 * lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp. 397 * This value is kept in HZ. 398 */ 399 400 uint_t *mir_max_msg_sizep; /* Reference to sanity check size */ 401 /* 402 * This pointer is set to &clnt_max_msg_size or 403 * &svc_max_msg_size during the appropriate ioctl. 404 */ 405 zoneid_t mir_zoneid; /* zone which pushed rpcmod */ 406 /* Server-side fields. */ 407 int mir_ref_cnt; /* Reference count: server side only */ 408 /* counts the number of references */ 409 /* that a kernel RPC server thread */ 410 /* (see svc_run()) has on this rpcmod */ 411 /* slot. Effectively, it is the */ 412 /* number of unprocessed messages */ 413 /* that have been passed up to the */ 414 /* kRPC layer */ 415 416 mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */ 417 /* T_DISCON_IND */ 418 419 /* 420 * these fields are for both client and server, but for debugging, 421 * it is easier to have these last in the structure. 422 */ 423 kmutex_t mir_mutex; /* Mutex and condvar for close */ 424 kcondvar_t mir_condvar; /* synchronization. */ 425 kcondvar_t mir_timer_cv; /* Timer routine sync. */ 426 } mir_t; 427 428 void tmp_rput(queue_t *q, mblk_t *mp); 429 430 struct xprt_style_ops tmpops = { 431 NULL, 432 NULL, 433 putnext, 434 NULL, 435 tmp_rput, 436 NULL 437 }; 438 439 void 440 tmp_rput(queue_t *q, mblk_t *mp) 441 { 442 struct temp_slot *t = (struct temp_slot *)(q->q_ptr); 443 struct T_info_ack *pptr; 444 445 switch (mp->b_datap->db_type) { 446 case M_PCPROTO: 447 pptr = (struct T_info_ack *)mp->b_rptr; 448 switch (pptr->PRIM_type) { 449 case T_INFO_ACK: 450 mutex_enter(&t->lock); 451 t->info_ack = mp; 452 cv_signal(&t->wait); 453 mutex_exit(&t->lock); 454 return; 455 default: 456 break; 457 } 458 default: 459 break; 460 } 461 462 /* 463 * Not an info-ack, so free it. This is ok because we should 464 * not be receiving data until the open finishes: rpcmod 465 * is pushed well before the end-point is bound to an address. 466 */ 467 freemsg(mp); 468 } 469 470 int 471 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) 472 { 473 mblk_t *bp; 474 struct temp_slot ts, *t; 475 struct T_info_ack *pptr; 476 int error = 0; 477 478 ASSERT(q != NULL); 479 /* 480 * Check for re-opens. 481 */ 482 if (q->q_ptr) { 483 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, 484 "rpcmodopen_end:(%s)", "q->qptr"); 485 return (0); 486 } 487 488 t = &ts; 489 bzero(t, sizeof (*t)); 490 q->q_ptr = (void *)t; 491 WR(q)->q_ptr = (void *)t; 492 493 /* 494 * Allocate the required messages upfront. 495 */ 496 if ((bp = allocb_cred(sizeof (struct T_info_req) + 497 sizeof (struct T_info_ack), crp, curproc->p_pid)) == NULL) { 498 return (ENOBUFS); 499 } 500 501 mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL); 502 cv_init(&t->wait, NULL, CV_DEFAULT, NULL); 503 504 t->ops = &tmpops; 505 506 qprocson(q); 507 bp->b_datap->db_type = M_PCPROTO; 508 *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ; 509 bp->b_wptr += sizeof (struct T_info_req); 510 putnext(WR(q), bp); 511 512 mutex_enter(&t->lock); 513 while (t->info_ack == NULL) { 514 if (cv_wait_sig(&t->wait, &t->lock) == 0) { 515 error = EINTR; 516 break; 517 } 518 } 519 mutex_exit(&t->lock); 520 521 if (error) 522 goto out; 523 524 pptr = (struct T_info_ack *)t->info_ack->b_rptr; 525 526 if (pptr->SERV_type == T_CLTS) { 527 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0) 528 ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops; 529 } else { 530 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0) 531 ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops; 532 } 533 534 out: 535 if (error) 536 qprocsoff(q); 537 538 freemsg(t->info_ack); 539 mutex_destroy(&t->lock); 540 cv_destroy(&t->wait); 541 542 return (error); 543 } 544 545 void 546 rmm_rput(queue_t *q, mblk_t *mp) 547 { 548 (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp); 549 } 550 551 void 552 rmm_rsrv(queue_t *q) 553 { 554 (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q); 555 } 556 557 void 558 rmm_wput(queue_t *q, mblk_t *mp) 559 { 560 (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp); 561 } 562 563 void 564 rmm_wsrv(queue_t *q) 565 { 566 (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q); 567 } 568 569 int 570 rmm_close(queue_t *q, int flag, cred_t *crp) 571 { 572 return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp)); 573 } 574 575 /* 576 * rpcmodopen - open routine gets called when the module gets pushed 577 * onto the stream. 578 */ 579 /*ARGSUSED*/ 580 int 581 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) 582 { 583 struct rpcm *rmp; 584 585 TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:"); 586 587 /* 588 * Only sufficiently privileged users can use this module, and it 589 * is assumed that they will use this module properly, and NOT send 590 * bulk data from downstream. 591 */ 592 if (secpolicy_rpcmod_open(crp) != 0) 593 return (EPERM); 594 595 /* 596 * Allocate slot data structure. 597 */ 598 rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP); 599 600 mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL); 601 cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL); 602 rmp->rm_zoneid = rpc_zoneid(); 603 /* 604 * slot type will be set by kRPC client and server ioctl's 605 */ 606 rmp->rm_type = 0; 607 608 q->q_ptr = (void *)rmp; 609 WR(q)->q_ptr = (void *)rmp; 610 611 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end"); 612 return (0); 613 } 614 615 /* 616 * rpcmodclose - This routine gets called when the module gets popped 617 * off of the stream. 618 */ 619 /*ARGSUSED*/ 620 int 621 rpcmodclose(queue_t *q, int flag, cred_t *crp) 622 { 623 struct rpcm *rmp; 624 625 ASSERT(q != NULL); 626 rmp = (struct rpcm *)q->q_ptr; 627 628 /* 629 * Mark our state as closing. 630 */ 631 mutex_enter(&rmp->rm_lock); 632 rmp->rm_state |= RM_CLOSING; 633 634 /* 635 * Check and see if there are any messages on the queue. If so, send 636 * the messages, regardless whether the downstream module is ready to 637 * accept data. 638 */ 639 if (rmp->rm_type == RPC_SERVER) { 640 flushq(q, FLUSHDATA); 641 642 qenable(WR(q)); 643 644 if (rmp->rm_ref) { 645 mutex_exit(&rmp->rm_lock); 646 /* 647 * call into SVC to clean the queue 648 */ 649 svc_queueclean(q); 650 mutex_enter(&rmp->rm_lock); 651 652 /* 653 * Block while there are kRPC threads with a reference 654 * to this message. 655 */ 656 while (rmp->rm_ref) 657 cv_wait(&rmp->rm_cwait, &rmp->rm_lock); 658 } 659 660 mutex_exit(&rmp->rm_lock); 661 662 /* 663 * It is now safe to remove this queue from the stream. No kRPC 664 * threads have a reference to the stream, and none ever will, 665 * because RM_CLOSING is set. 666 */ 667 qprocsoff(q); 668 669 /* Notify kRPC that this stream is going away. */ 670 svc_queueclose(q); 671 } else { 672 mutex_exit(&rmp->rm_lock); 673 qprocsoff(q); 674 } 675 676 q->q_ptr = NULL; 677 WR(q)->q_ptr = NULL; 678 mutex_destroy(&rmp->rm_lock); 679 cv_destroy(&rmp->rm_cwait); 680 kmem_free(rmp, sizeof (*rmp)); 681 return (0); 682 } 683 684 /* 685 * rpcmodrput - Module read put procedure. This is called from 686 * the module, driver, or stream head downstream. 687 */ 688 void 689 rpcmodrput(queue_t *q, mblk_t *mp) 690 { 691 struct rpcm *rmp; 692 union T_primitives *pptr; 693 int hdrsz; 694 695 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:"); 696 697 ASSERT(q != NULL); 698 rmp = (struct rpcm *)q->q_ptr; 699 700 if (rmp->rm_type == 0) { 701 freemsg(mp); 702 return; 703 } 704 705 switch (mp->b_datap->db_type) { 706 default: 707 putnext(q, mp); 708 break; 709 710 case M_PROTO: 711 case M_PCPROTO: 712 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t)); 713 pptr = (union T_primitives *)mp->b_rptr; 714 715 /* 716 * Forward this message to kRPC if it is data. 717 */ 718 if (pptr->type == T_UNITDATA_IND) { 719 /* 720 * Check if the module is being popped. 721 */ 722 mutex_enter(&rmp->rm_lock); 723 if (rmp->rm_state & RM_CLOSING) { 724 mutex_exit(&rmp->rm_lock); 725 putnext(q, mp); 726 break; 727 } 728 729 switch (rmp->rm_type) { 730 case RPC_CLIENT: 731 mutex_exit(&rmp->rm_lock); 732 hdrsz = mp->b_wptr - mp->b_rptr; 733 734 /* 735 * Make sure the header is sane. 736 */ 737 if (hdrsz < TUNITDATAINDSZ || 738 hdrsz < (pptr->unitdata_ind.OPT_length + 739 pptr->unitdata_ind.OPT_offset) || 740 hdrsz < (pptr->unitdata_ind.SRC_length + 741 pptr->unitdata_ind.SRC_offset)) { 742 freemsg(mp); 743 return; 744 } 745 746 /* 747 * Call clnt_clts_dispatch_notify, so that it 748 * can pass the message to the proper caller. 749 * Don't discard the header just yet since the 750 * client may need the sender's address. 751 */ 752 clnt_clts_dispatch_notify(mp, hdrsz, 753 rmp->rm_zoneid); 754 return; 755 case RPC_SERVER: 756 /* 757 * rm_krpc_cell is exclusively used by the kRPC 758 * CLTS server. Try to submit the message to 759 * kRPC. Since this is an unreliable channel, we 760 * can just free the message in case the kRPC 761 * does not accept new messages. 762 */ 763 if (rmp->rm_krpc_cell && 764 svc_queuereq(q, mp, TRUE)) { 765 /* 766 * Raise the reference count on this 767 * module to prevent it from being 768 * popped before kRPC generates the 769 * reply. 770 */ 771 rmp->rm_ref++; 772 mutex_exit(&rmp->rm_lock); 773 } else { 774 mutex_exit(&rmp->rm_lock); 775 freemsg(mp); 776 } 777 return; 778 default: 779 mutex_exit(&rmp->rm_lock); 780 freemsg(mp); 781 return; 782 } /* end switch(rmp->rm_type) */ 783 } else if (pptr->type == T_UDERROR_IND) { 784 mutex_enter(&rmp->rm_lock); 785 hdrsz = mp->b_wptr - mp->b_rptr; 786 787 /* 788 * Make sure the header is sane 789 */ 790 if (hdrsz < TUDERRORINDSZ || 791 hdrsz < (pptr->uderror_ind.OPT_length + 792 pptr->uderror_ind.OPT_offset) || 793 hdrsz < (pptr->uderror_ind.DEST_length + 794 pptr->uderror_ind.DEST_offset)) { 795 mutex_exit(&rmp->rm_lock); 796 freemsg(mp); 797 return; 798 } 799 800 /* 801 * In the case where a unit data error has been 802 * received, all we need to do is clear the message from 803 * the queue. 804 */ 805 mutex_exit(&rmp->rm_lock); 806 freemsg(mp); 807 RPCLOG(32, "rpcmodrput: unitdata error received at " 808 "%ld\n", gethrestime_sec()); 809 return; 810 } /* end else if (pptr->type == T_UDERROR_IND) */ 811 812 putnext(q, mp); 813 break; 814 } /* end switch (mp->b_datap->db_type) */ 815 816 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END, 817 "rpcmodrput_end:"); 818 /* 819 * Return codes are not looked at by the STREAMS framework. 820 */ 821 } 822 823 /* 824 * write put procedure 825 */ 826 void 827 rpcmodwput(queue_t *q, mblk_t *mp) 828 { 829 struct rpcm *rmp; 830 831 ASSERT(q != NULL); 832 833 switch (mp->b_datap->db_type) { 834 case M_PROTO: 835 case M_PCPROTO: 836 break; 837 default: 838 rpcmodwput_other(q, mp); 839 return; 840 } 841 842 /* 843 * Check to see if we can send the message downstream. 844 */ 845 if (canputnext(q)) { 846 putnext(q, mp); 847 return; 848 } 849 850 rmp = (struct rpcm *)q->q_ptr; 851 ASSERT(rmp != NULL); 852 853 /* 854 * The first canputnext failed. Try again except this time with the 855 * lock held, so that we can check the state of the stream to see if 856 * it is closing. If either of these conditions evaluate to true 857 * then send the meesage. 858 */ 859 mutex_enter(&rmp->rm_lock); 860 if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) { 861 mutex_exit(&rmp->rm_lock); 862 putnext(q, mp); 863 } else { 864 /* 865 * canputnext failed again and the stream is not closing. 866 * Place the message on the queue and let the service 867 * procedure handle the message. 868 */ 869 mutex_exit(&rmp->rm_lock); 870 (void) putq(q, mp); 871 } 872 } 873 874 static void 875 rpcmodwput_other(queue_t *q, mblk_t *mp) 876 { 877 struct rpcm *rmp; 878 struct iocblk *iocp; 879 880 rmp = (struct rpcm *)q->q_ptr; 881 ASSERT(rmp != NULL); 882 883 switch (mp->b_datap->db_type) { 884 case M_IOCTL: 885 iocp = (struct iocblk *)mp->b_rptr; 886 ASSERT(iocp != NULL); 887 switch (iocp->ioc_cmd) { 888 case RPC_CLIENT: 889 case RPC_SERVER: 890 mutex_enter(&rmp->rm_lock); 891 rmp->rm_type = iocp->ioc_cmd; 892 mutex_exit(&rmp->rm_lock); 893 mp->b_datap->db_type = M_IOCACK; 894 qreply(q, mp); 895 return; 896 default: 897 /* 898 * pass the ioctl downstream and hope someone 899 * down there knows how to handle it. 900 */ 901 putnext(q, mp); 902 return; 903 } 904 default: 905 break; 906 } 907 /* 908 * This is something we definitely do not know how to handle, just 909 * pass the message downstream 910 */ 911 putnext(q, mp); 912 } 913 914 /* 915 * Module write service procedure. This is called by downstream modules 916 * for back enabling during flow control. 917 */ 918 void 919 rpcmodwsrv(queue_t *q) 920 { 921 struct rpcm *rmp; 922 mblk_t *mp = NULL; 923 924 rmp = (struct rpcm *)q->q_ptr; 925 ASSERT(rmp != NULL); 926 927 /* 928 * Get messages that may be queued and send them down stream 929 */ 930 while ((mp = getq(q)) != NULL) { 931 /* 932 * Optimize the service procedure for the server-side, by 933 * avoiding a call to canputnext(). 934 */ 935 if (rmp->rm_type == RPC_SERVER || canputnext(q)) { 936 putnext(q, mp); 937 continue; 938 } 939 (void) putbq(q, mp); 940 return; 941 } 942 } 943 944 void 945 rpcmod_hold(queue_t *q) 946 { 947 struct rpcm *rmp = (struct rpcm *)q->q_ptr; 948 949 mutex_enter(&rmp->rm_lock); 950 rmp->rm_ref++; 951 mutex_exit(&rmp->rm_lock); 952 } 953 954 void 955 rpcmod_release(queue_t *q, mblk_t *bp, 956 /* LINTED E_FUNC_ARG_UNUSED */ 957 bool_t enable __unused) 958 { 959 struct rpcm *rmp; 960 961 /* 962 * For now, just free the message. 963 */ 964 if (bp) 965 freemsg(bp); 966 rmp = (struct rpcm *)q->q_ptr; 967 968 mutex_enter(&rmp->rm_lock); 969 rmp->rm_ref--; 970 971 if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) { 972 cv_broadcast(&rmp->rm_cwait); 973 } 974 975 mutex_exit(&rmp->rm_lock); 976 } 977 978 /* 979 * This part of rpcmod is pushed on a connection-oriented transport for use 980 * by RPC. It serves to bypass the Stream head, implements 981 * the record marking protocol, and dispatches incoming RPC messages. 982 */ 983 984 /* Default idle timer values */ 985 #define MIR_CLNT_IDLE_TIMEOUT (5 * (60 * 1000L)) /* 5 minutes */ 986 #define MIR_SVC_IDLE_TIMEOUT (6 * (60 * 1000L)) /* 6 minutes */ 987 #define MIR_SVC_ORDREL_TIMEOUT (10 * (60 * 1000L)) /* 10 minutes */ 988 #define MIR_LASTFRAG 0x80000000 /* Record marker */ 989 990 #define MIR_SVC_QUIESCED(mir) \ 991 (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0) 992 993 #define MIR_CLEAR_INRSRV(mir_ptr) { \ 994 (mir_ptr)->mir_inrservice = 0; \ 995 if ((mir_ptr)->mir_type == RPC_SERVER && \ 996 (mir_ptr)->mir_closing) \ 997 cv_signal(&(mir_ptr)->mir_condvar); \ 998 } 999 1000 /* 1001 * Don't block service procedure (and mir_close) if 1002 * we are in the process of closing. 1003 */ 1004 #define MIR_WCANPUTNEXT(mir_ptr, write_q) \ 1005 (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1)) 1006 1007 static int mir_clnt_dup_request(queue_t *q, mblk_t *mp); 1008 static void mir_rput_proto(queue_t *q, mblk_t *mp); 1009 static int mir_svc_policy_notify(queue_t *q, int event); 1010 static void mir_svc_start(queue_t *wq); 1011 static void mir_svc_idle_start(queue_t *, mir_t *); 1012 static void mir_svc_idle_stop(queue_t *, mir_t *); 1013 static void mir_svc_start_close(queue_t *, mir_t *); 1014 static void mir_clnt_idle_do_stop(queue_t *); 1015 static void mir_clnt_idle_stop(queue_t *, mir_t *); 1016 static void mir_clnt_idle_start(queue_t *, mir_t *); 1017 static void mir_wput(queue_t *q, mblk_t *mp); 1018 static void mir_wput_other(queue_t *q, mblk_t *mp); 1019 static void mir_wsrv(queue_t *q); 1020 static void mir_disconnect(queue_t *, mir_t *ir); 1021 static int mir_check_len(queue_t *, mblk_t *); 1022 static void mir_timer(void *); 1023 1024 extern void (*mir_start)(queue_t *); 1025 extern void (*clnt_stop_idle)(queue_t *); 1026 1027 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT; 1028 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT; 1029 1030 /* 1031 * Timeout for subsequent notifications of idle connection. This is 1032 * typically used to clean up after a wedged orderly release. 1033 */ 1034 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */ 1035 1036 extern uint_t *clnt_max_msg_sizep; 1037 extern uint_t *svc_max_msg_sizep; 1038 uint_t clnt_max_msg_size = RPC_MAXDATASIZE; 1039 uint_t svc_max_msg_size = RPC_MAXDATASIZE; 1040 uint_t mir_krpc_cell_null; 1041 1042 static void 1043 mir_timer_stop(mir_t *mir) 1044 { 1045 timeout_id_t tid; 1046 1047 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1048 1049 /* 1050 * Since the mir_mutex lock needs to be released to call 1051 * untimeout(), we need to make sure that no other thread 1052 * can start/stop the timer (changing mir_timer_id) during 1053 * that time. The mir_timer_call bit and the mir_timer_cv 1054 * condition variable are used to synchronize this. Setting 1055 * mir_timer_call also tells mir_timer() (refer to the comments 1056 * in mir_timer()) that it does not need to do anything. 1057 */ 1058 while (mir->mir_timer_call) 1059 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); 1060 mir->mir_timer_call = B_TRUE; 1061 1062 if ((tid = mir->mir_timer_id) != 0) { 1063 mir->mir_timer_id = 0; 1064 mutex_exit(&mir->mir_mutex); 1065 (void) untimeout(tid); 1066 mutex_enter(&mir->mir_mutex); 1067 } 1068 mir->mir_timer_call = B_FALSE; 1069 cv_broadcast(&mir->mir_timer_cv); 1070 } 1071 1072 static void 1073 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl) 1074 { 1075 timeout_id_t tid; 1076 1077 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1078 1079 while (mir->mir_timer_call) 1080 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); 1081 mir->mir_timer_call = B_TRUE; 1082 1083 if ((tid = mir->mir_timer_id) != 0) { 1084 mutex_exit(&mir->mir_mutex); 1085 (void) untimeout(tid); 1086 mutex_enter(&mir->mir_mutex); 1087 } 1088 /* Only start the timer when it is not closing. */ 1089 if (!mir->mir_closing) { 1090 mir->mir_timer_id = timeout(mir_timer, q, 1091 MSEC_TO_TICK(intrvl)); 1092 } 1093 mir->mir_timer_call = B_FALSE; 1094 cv_broadcast(&mir->mir_timer_cv); 1095 } 1096 1097 static int 1098 mir_clnt_dup_request(queue_t *q, mblk_t *mp) 1099 { 1100 mblk_t *mp1; 1101 uint32_t new_xid; 1102 uint32_t old_xid; 1103 1104 ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex)); 1105 new_xid = BE32_TO_U32(&mp->b_rptr[4]); 1106 /* 1107 * This loop is a bit tacky -- it walks the STREAMS list of 1108 * flow-controlled messages. 1109 */ 1110 if ((mp1 = q->q_first) != NULL) { 1111 do { 1112 old_xid = BE32_TO_U32(&mp1->b_rptr[4]); 1113 if (new_xid == old_xid) 1114 return (1); 1115 } while ((mp1 = mp1->b_next) != NULL); 1116 } 1117 return (0); 1118 } 1119 1120 static int 1121 mir_close(queue_t *q) 1122 { 1123 mir_t *mir = q->q_ptr; 1124 mblk_t *mp; 1125 bool_t queue_cleaned = FALSE; 1126 1127 RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q); 1128 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1129 mutex_enter(&mir->mir_mutex); 1130 if ((mp = mir->mir_head_mp) != NULL) { 1131 mir->mir_head_mp = NULL; 1132 mir->mir_tail_mp = NULL; 1133 freemsg(mp); 1134 } 1135 /* 1136 * Set mir_closing so we get notified when MIR_SVC_QUIESCED() 1137 * is TRUE. And mir_timer_start() won't start the timer again. 1138 */ 1139 mir->mir_closing = B_TRUE; 1140 mir_timer_stop(mir); 1141 1142 if (mir->mir_type == RPC_SERVER) { 1143 flushq(q, FLUSHDATA); /* Ditch anything waiting on read q */ 1144 1145 /* 1146 * This will prevent more requests from arriving and 1147 * will force rpcmod to ignore flow control. 1148 */ 1149 mir_svc_start_close(WR(q), mir); 1150 1151 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) { 1152 1153 if (mir->mir_ref_cnt && !mir->mir_inrservice && 1154 (queue_cleaned == FALSE)) { 1155 /* 1156 * call into SVC to clean the queue 1157 */ 1158 mutex_exit(&mir->mir_mutex); 1159 svc_queueclean(q); 1160 queue_cleaned = TRUE; 1161 mutex_enter(&mir->mir_mutex); 1162 continue; 1163 } 1164 1165 /* 1166 * Bugid 1253810 - Force the write service 1167 * procedure to send its messages, regardless 1168 * whether the downstream module is ready 1169 * to accept data. 1170 */ 1171 if (mir->mir_inwservice == 1) 1172 qenable(WR(q)); 1173 1174 cv_wait(&mir->mir_condvar, &mir->mir_mutex); 1175 } 1176 1177 mutex_exit(&mir->mir_mutex); 1178 qprocsoff(q); 1179 1180 /* Notify kRPC that this stream is going away. */ 1181 svc_queueclose(q); 1182 } else { 1183 mutex_exit(&mir->mir_mutex); 1184 qprocsoff(q); 1185 } 1186 1187 mutex_destroy(&mir->mir_mutex); 1188 cv_destroy(&mir->mir_condvar); 1189 cv_destroy(&mir->mir_timer_cv); 1190 kmem_free(mir, sizeof (mir_t)); 1191 return (0); 1192 } 1193 1194 /* 1195 * This is server side only (RPC_SERVER). 1196 * 1197 * Exit idle mode. 1198 */ 1199 static void 1200 mir_svc_idle_stop(queue_t *q, mir_t *mir) 1201 { 1202 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1203 ASSERT((q->q_flag & QREADR) == 0); 1204 ASSERT(mir->mir_type == RPC_SERVER); 1205 RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q); 1206 1207 mir_timer_stop(mir); 1208 } 1209 1210 /* 1211 * This is server side only (RPC_SERVER). 1212 * 1213 * Start idle processing, which will include setting idle timer if the 1214 * stream is not being closed. 1215 */ 1216 static void 1217 mir_svc_idle_start(queue_t *q, mir_t *mir) 1218 { 1219 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1220 ASSERT((q->q_flag & QREADR) == 0); 1221 ASSERT(mir->mir_type == RPC_SERVER); 1222 RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q); 1223 1224 /* 1225 * Don't re-start idle timer if we are closing queues. 1226 */ 1227 if (mir->mir_closing) { 1228 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n", 1229 (void *)q); 1230 1231 /* 1232 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED() 1233 * is true. When it is true, and we are in the process of 1234 * closing the stream, signal any thread waiting in 1235 * mir_close(). 1236 */ 1237 if (mir->mir_inwservice == 0) 1238 cv_signal(&mir->mir_condvar); 1239 1240 } else { 1241 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n", 1242 mir->mir_ordrel_pending ? "ordrel" : "normal"); 1243 /* 1244 * Normal condition, start the idle timer. If an orderly 1245 * release has been sent, set the timeout to wait for the 1246 * client to close its side of the connection. Otherwise, 1247 * use the normal idle timeout. 1248 */ 1249 mir_timer_start(q, mir, mir->mir_ordrel_pending ? 1250 svc_ordrel_timeout : mir->mir_idle_timeout); 1251 } 1252 } 1253 1254 /* ARGSUSED */ 1255 static int 1256 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) 1257 { 1258 mir_t *mir; 1259 1260 RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q); 1261 /* Set variables used directly by kRPC. */ 1262 if (!mir_start) 1263 mir_start = mir_svc_start; 1264 if (!clnt_stop_idle) 1265 clnt_stop_idle = mir_clnt_idle_do_stop; 1266 if (!clnt_max_msg_sizep) 1267 clnt_max_msg_sizep = &clnt_max_msg_size; 1268 if (!svc_max_msg_sizep) 1269 svc_max_msg_sizep = &svc_max_msg_size; 1270 1271 /* Allocate a zero'ed out mir structure for this stream. */ 1272 mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP); 1273 1274 /* 1275 * We set hold inbound here so that incoming messages will 1276 * be held on the read-side queue until the stream is completely 1277 * initialized with a RPC_CLIENT or RPC_SERVER ioctl. During 1278 * the ioctl processing, the flag is cleared and any messages that 1279 * arrived between the open and the ioctl are delivered to kRPC. 1280 * 1281 * Early data should never arrive on a client stream since 1282 * servers only respond to our requests and we do not send any. 1283 * until after the stream is initialized. Early data is 1284 * very common on a server stream where the client will start 1285 * sending data as soon as the connection is made (and this 1286 * is especially true with TCP where the protocol accepts the 1287 * connection before nfsd or kRPC is notified about it). 1288 */ 1289 1290 mir->mir_hold_inbound = 1; 1291 1292 /* 1293 * Start the record marker looking for a 4-byte header. When 1294 * this length is negative, it indicates that rpcmod is looking 1295 * for bytes to consume for the record marker header. When it 1296 * is positive, it holds the number of bytes that have arrived 1297 * for the current fragment and are being held in mir_header_mp. 1298 */ 1299 1300 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 1301 1302 mir->mir_zoneid = rpc_zoneid(); 1303 mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL); 1304 cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL); 1305 cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL); 1306 1307 q->q_ptr = (char *)mir; 1308 WR(q)->q_ptr = (char *)mir; 1309 1310 /* 1311 * We noenable the read-side queue because we don't want it 1312 * automatically enabled by putq. We enable it explicitly 1313 * in mir_wsrv when appropriate. (See additional comments on 1314 * flow control at the beginning of mir_rsrv.) 1315 */ 1316 noenable(q); 1317 1318 qprocson(q); 1319 return (0); 1320 } 1321 1322 /* 1323 * Read-side put routine for both the client and server side. Does the 1324 * record marking for incoming RPC messages, and when complete, dispatches 1325 * the message to either the client or server. 1326 */ 1327 static void 1328 mir_rput(queue_t *q, mblk_t *mp) 1329 { 1330 int excess; 1331 int32_t frag_len, frag_header; 1332 mblk_t *cont_mp, *head_mp, *tail_mp, *mp1; 1333 mir_t *mir = q->q_ptr; 1334 boolean_t stop_timer = B_FALSE; 1335 1336 ASSERT(mir != NULL); 1337 1338 /* 1339 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER 1340 * with the corresponding ioctl, then don't accept 1341 * any inbound data. This should never happen for streams 1342 * created by nfsd or client-side kRPC because they are careful 1343 * to set the mode of the stream before doing anything else. 1344 */ 1345 if (mir->mir_type == 0) { 1346 freemsg(mp); 1347 return; 1348 } 1349 1350 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1351 1352 switch (mp->b_datap->db_type) { 1353 case M_DATA: 1354 break; 1355 case M_PROTO: 1356 case M_PCPROTO: 1357 if (MBLKL(mp) < sizeof (t_scalar_t)) { 1358 RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n", 1359 (int)MBLKL(mp)); 1360 freemsg(mp); 1361 return; 1362 } 1363 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) { 1364 mir_rput_proto(q, mp); 1365 return; 1366 } 1367 1368 /* Throw away the T_DATA_IND block and continue with data. */ 1369 mp1 = mp; 1370 mp = mp->b_cont; 1371 freeb(mp1); 1372 break; 1373 case M_SETOPTS: 1374 /* 1375 * If a module on the stream is trying set the Stream head's 1376 * high water mark, then set our hiwater to the requested 1377 * value. We are the "stream head" for all inbound 1378 * data messages since messages are passed directly to kRPC. 1379 */ 1380 if (MBLKL(mp) >= sizeof (struct stroptions)) { 1381 struct stroptions *stropts; 1382 1383 stropts = (struct stroptions *)mp->b_rptr; 1384 if ((stropts->so_flags & SO_HIWAT) && 1385 !(stropts->so_flags & SO_BAND)) { 1386 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat); 1387 } 1388 } 1389 putnext(q, mp); 1390 return; 1391 case M_FLUSH: 1392 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr); 1393 RPCLOG(32, "on q 0x%p\n", (void *)q); 1394 putnext(q, mp); 1395 return; 1396 default: 1397 putnext(q, mp); 1398 return; 1399 } 1400 1401 mutex_enter(&mir->mir_mutex); 1402 1403 /* 1404 * If this connection is closing, don't accept any new messages. 1405 */ 1406 if (mir->mir_svc_no_more_msgs) { 1407 ASSERT(mir->mir_type == RPC_SERVER); 1408 mutex_exit(&mir->mir_mutex); 1409 freemsg(mp); 1410 return; 1411 } 1412 1413 /* Get local copies for quicker access. */ 1414 frag_len = mir->mir_frag_len; 1415 frag_header = mir->mir_frag_header; 1416 head_mp = mir->mir_head_mp; 1417 tail_mp = mir->mir_tail_mp; 1418 1419 /* Loop, processing each message block in the mp chain separately. */ 1420 do { 1421 cont_mp = mp->b_cont; 1422 mp->b_cont = NULL; 1423 1424 /* 1425 * Drop zero-length mblks to prevent unbounded kernel memory 1426 * consumption. 1427 */ 1428 if (MBLKL(mp) == 0) { 1429 freeb(mp); 1430 continue; 1431 } 1432 1433 /* 1434 * If frag_len is negative, we're still in the process of 1435 * building frag_header -- try to complete it with this mblk. 1436 */ 1437 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) { 1438 frag_len++; 1439 frag_header <<= 8; 1440 frag_header += *mp->b_rptr++; 1441 } 1442 1443 if (MBLKL(mp) == 0 && frag_len < 0) { 1444 /* 1445 * We consumed this mblk while trying to complete the 1446 * fragment header. Free it and move on. 1447 */ 1448 freeb(mp); 1449 continue; 1450 } 1451 1452 ASSERT(frag_len >= 0); 1453 1454 /* 1455 * Now frag_header has the number of bytes in this fragment 1456 * and we're just waiting to collect them all. Chain our 1457 * latest mblk onto the list and see if we now have enough 1458 * bytes to complete the fragment. 1459 */ 1460 if (head_mp == NULL) { 1461 ASSERT(tail_mp == NULL); 1462 head_mp = tail_mp = mp; 1463 } else { 1464 tail_mp->b_cont = mp; 1465 tail_mp = mp; 1466 } 1467 1468 frag_len += MBLKL(mp); 1469 excess = frag_len - (frag_header & ~MIR_LASTFRAG); 1470 if (excess < 0) { 1471 /* 1472 * We still haven't received enough data to complete 1473 * the fragment, so continue on to the next mblk. 1474 */ 1475 continue; 1476 } 1477 1478 /* 1479 * We've got a complete fragment. If there are excess bytes, 1480 * then they're part of the next fragment's header (of either 1481 * this RPC message or the next RPC message). Split that part 1482 * into its own mblk so that we can safely freeb() it when 1483 * building frag_header above. 1484 */ 1485 if (excess > 0) { 1486 if ((mp1 = dupb(mp)) == NULL && 1487 (mp1 = copyb(mp)) == NULL) { 1488 freemsg(head_mp); 1489 freemsg(cont_mp); 1490 RPCLOG0(1, "mir_rput: dupb/copyb failed\n"); 1491 mir->mir_frag_header = 0; 1492 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 1493 mir->mir_head_mp = NULL; 1494 mir->mir_tail_mp = NULL; 1495 mir_disconnect(q, mir); /* drops mir_mutex */ 1496 return; 1497 } 1498 1499 /* 1500 * Relink the message chain so that the next mblk is 1501 * the next fragment header, followed by the rest of 1502 * the message chain. 1503 */ 1504 mp1->b_cont = cont_mp; 1505 cont_mp = mp1; 1506 1507 /* 1508 * Data in the new mblk begins at the next fragment, 1509 * and data in the old mblk ends at the next fragment. 1510 */ 1511 mp1->b_rptr = mp1->b_wptr - excess; 1512 mp->b_wptr -= excess; 1513 } 1514 1515 /* 1516 * Reset frag_len and frag_header for the next fragment. 1517 */ 1518 frag_len = -(int32_t)sizeof (uint32_t); 1519 if (!(frag_header & MIR_LASTFRAG)) { 1520 /* 1521 * The current fragment is complete, but more 1522 * fragments need to be processed before we can 1523 * pass along the RPC message headed at head_mp. 1524 */ 1525 frag_header = 0; 1526 continue; 1527 } 1528 frag_header = 0; 1529 1530 /* 1531 * We've got a complete RPC message; pass it to the 1532 * appropriate consumer. 1533 */ 1534 switch (mir->mir_type) { 1535 case RPC_CLIENT: 1536 if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) { 1537 /* 1538 * Mark this stream as active. This marker 1539 * is used in mir_timer(). 1540 */ 1541 mir->mir_clntreq = 1; 1542 mir->mir_use_timestamp = ddi_get_lbolt(); 1543 } else { 1544 freemsg(head_mp); 1545 } 1546 break; 1547 1548 case RPC_SERVER: 1549 /* 1550 * Check for flow control before passing the 1551 * message to kRPC. 1552 */ 1553 if (!mir->mir_hold_inbound) { 1554 if (mir->mir_krpc_cell) { 1555 1556 if (mir_check_len(q, head_mp)) 1557 return; 1558 1559 if (q->q_first == NULL && 1560 svc_queuereq(q, head_mp, TRUE)) { 1561 /* 1562 * If the reference count is 0 1563 * (not including this 1564 * request), then the stream is 1565 * transitioning from idle to 1566 * non-idle. In this case, we 1567 * cancel the idle timer. 1568 */ 1569 if (mir->mir_ref_cnt++ == 0) 1570 stop_timer = B_TRUE; 1571 } else { 1572 (void) putq(q, head_mp); 1573 mir->mir_inrservice = B_TRUE; 1574 } 1575 } else { 1576 /* 1577 * Count # of times this happens. Should 1578 * be never, but experience shows 1579 * otherwise. 1580 */ 1581 mir_krpc_cell_null++; 1582 freemsg(head_mp); 1583 } 1584 } else { 1585 /* 1586 * If the outbound side of the stream is 1587 * flow controlled, then hold this message 1588 * until client catches up. mir_hold_inbound 1589 * is set in mir_wput and cleared in mir_wsrv. 1590 */ 1591 (void) putq(q, head_mp); 1592 mir->mir_inrservice = B_TRUE; 1593 } 1594 break; 1595 default: 1596 RPCLOG(1, "mir_rput: unknown mir_type %d\n", 1597 mir->mir_type); 1598 freemsg(head_mp); 1599 break; 1600 } 1601 1602 /* 1603 * Reset the chain since we're starting on a new RPC message. 1604 */ 1605 head_mp = tail_mp = NULL; 1606 } while ((mp = cont_mp) != NULL); 1607 1608 /* 1609 * Sanity check the message length; if it's too large mir_check_len() 1610 * will shutdown the connection, drop mir_mutex, and return non-zero. 1611 */ 1612 if (head_mp != NULL && mir->mir_setup_complete && 1613 mir_check_len(q, head_mp)) 1614 return; 1615 1616 /* Save our local copies back in the mir structure. */ 1617 mir->mir_frag_header = frag_header; 1618 mir->mir_frag_len = frag_len; 1619 mir->mir_head_mp = head_mp; 1620 mir->mir_tail_mp = tail_mp; 1621 1622 /* 1623 * The timer is stopped after the whole message chain is processed. 1624 * The reason is that stopping the timer releases the mir_mutex 1625 * lock temporarily. This means that the request can be serviced 1626 * while we are still processing the message chain. This is not 1627 * good. So we stop the timer here instead. 1628 * 1629 * Note that if the timer fires before we stop it, it will not 1630 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer() 1631 * will just return. 1632 */ 1633 if (stop_timer) { 1634 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because " 1635 "ref cnt going to non zero\n", (void *)WR(q)); 1636 mir_svc_idle_stop(WR(q), mir); 1637 } 1638 mutex_exit(&mir->mir_mutex); 1639 } 1640 1641 static void 1642 mir_rput_proto(queue_t *q, mblk_t *mp) 1643 { 1644 mir_t *mir = (mir_t *)q->q_ptr; 1645 uint32_t type; 1646 uint32_t reason = 0; 1647 1648 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1649 1650 type = ((union T_primitives *)mp->b_rptr)->type; 1651 switch (mir->mir_type) { 1652 case RPC_CLIENT: 1653 switch (type) { 1654 case T_DISCON_IND: 1655 reason = ((struct T_discon_ind *) 1656 (mp->b_rptr))->DISCON_reason; 1657 /*FALLTHROUGH*/ 1658 case T_ORDREL_IND: 1659 mutex_enter(&mir->mir_mutex); 1660 if (mir->mir_head_mp) { 1661 freemsg(mir->mir_head_mp); 1662 mir->mir_head_mp = (mblk_t *)0; 1663 mir->mir_tail_mp = (mblk_t *)0; 1664 } 1665 /* 1666 * We are disconnecting, but not necessarily 1667 * closing. By not closing, we will fail to 1668 * pick up a possibly changed global timeout value, 1669 * unless we store it now. 1670 */ 1671 mir->mir_idle_timeout = clnt_idle_timeout; 1672 mir_clnt_idle_stop(WR(q), mir); 1673 1674 /* 1675 * Even though we are unconnected, we still 1676 * leave the idle timer going on the client. The 1677 * reason for is that if we've disconnected due 1678 * to a server-side disconnect, reset, or connection 1679 * timeout, there is a possibility the client may 1680 * retry the RPC request. This retry needs to done on 1681 * the same bound address for the server to interpret 1682 * it as such. However, we don't want 1683 * to wait forever for that possibility. If the 1684 * end-point stays unconnected for mir_idle_timeout 1685 * units of time, then that is a signal to the 1686 * connection manager to give up waiting for the 1687 * application (eg. NFS) to send a retry. 1688 */ 1689 mir_clnt_idle_start(WR(q), mir); 1690 mutex_exit(&mir->mir_mutex); 1691 clnt_dispatch_notifyall(WR(q), type, reason); 1692 freemsg(mp); 1693 return; 1694 case T_ERROR_ACK: 1695 { 1696 struct T_error_ack *terror; 1697 1698 terror = (struct T_error_ack *)mp->b_rptr; 1699 RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p", 1700 (void *)q); 1701 RPCLOG(1, " ERROR_prim: %s,", 1702 rpc_tpiprim2name(terror->ERROR_prim)); 1703 RPCLOG(1, " TLI_error: %s,", 1704 rpc_tpierr2name(terror->TLI_error)); 1705 RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error); 1706 if (terror->ERROR_prim == T_DISCON_REQ) { 1707 clnt_dispatch_notifyall(WR(q), type, reason); 1708 freemsg(mp); 1709 return; 1710 } else { 1711 if (clnt_dispatch_notifyconn(WR(q), mp)) 1712 return; 1713 } 1714 break; 1715 } 1716 case T_OK_ACK: 1717 { 1718 struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr; 1719 1720 if (tok->CORRECT_prim == T_DISCON_REQ) { 1721 clnt_dispatch_notifyall(WR(q), type, reason); 1722 freemsg(mp); 1723 return; 1724 } else { 1725 if (clnt_dispatch_notifyconn(WR(q), mp)) 1726 return; 1727 } 1728 break; 1729 } 1730 case T_CONN_CON: 1731 case T_INFO_ACK: 1732 case T_OPTMGMT_ACK: 1733 if (clnt_dispatch_notifyconn(WR(q), mp)) 1734 return; 1735 break; 1736 case T_BIND_ACK: 1737 break; 1738 default: 1739 RPCLOG(1, "mir_rput: unexpected message %d " 1740 "for kRPC client\n", 1741 ((union T_primitives *)mp->b_rptr)->type); 1742 break; 1743 } 1744 break; 1745 1746 case RPC_SERVER: 1747 switch (type) { 1748 case T_BIND_ACK: 1749 { 1750 struct T_bind_ack *tbind; 1751 1752 /* 1753 * If this is a listening stream, then shut 1754 * off the idle timer. 1755 */ 1756 tbind = (struct T_bind_ack *)mp->b_rptr; 1757 if (tbind->CONIND_number > 0) { 1758 mutex_enter(&mir->mir_mutex); 1759 mir_svc_idle_stop(WR(q), mir); 1760 1761 /* 1762 * mark this as a listen endpoint 1763 * for special handling. 1764 */ 1765 1766 mir->mir_listen_stream = 1; 1767 mutex_exit(&mir->mir_mutex); 1768 } 1769 break; 1770 } 1771 case T_DISCON_IND: 1772 case T_ORDREL_IND: 1773 RPCLOG(16, "mir_rput_proto: got %s indication\n", 1774 type == T_DISCON_IND ? "disconnect" 1775 : "orderly release"); 1776 1777 /* 1778 * For listen endpoint just pass 1779 * on the message. 1780 */ 1781 1782 if (mir->mir_listen_stream) 1783 break; 1784 1785 mutex_enter(&mir->mir_mutex); 1786 1787 /* 1788 * If client wants to break off connection, record 1789 * that fact. 1790 */ 1791 mir_svc_start_close(WR(q), mir); 1792 1793 /* 1794 * If we are idle, then send the orderly release 1795 * or disconnect indication to nfsd. 1796 */ 1797 if (MIR_SVC_QUIESCED(mir)) { 1798 mutex_exit(&mir->mir_mutex); 1799 break; 1800 } 1801 1802 RPCLOG(16, "mir_rput_proto: not idle, so " 1803 "disconnect/ord rel indication not passed " 1804 "upstream on 0x%p\n", (void *)q); 1805 1806 /* 1807 * Hold the indication until we get idle 1808 * If there already is an indication stored, 1809 * replace it if the new one is a disconnect. The 1810 * reasoning is that disconnection takes less time 1811 * to process, and once a client decides to 1812 * disconnect, we should do that. 1813 */ 1814 if (mir->mir_svc_pend_mp) { 1815 if (type == T_DISCON_IND) { 1816 RPCLOG(16, "mir_rput_proto: replacing" 1817 " held disconnect/ord rel" 1818 " indication with disconnect on" 1819 " 0x%p\n", (void *)q); 1820 1821 freemsg(mir->mir_svc_pend_mp); 1822 mir->mir_svc_pend_mp = mp; 1823 } else { 1824 RPCLOG(16, "mir_rput_proto: already " 1825 "held a disconnect/ord rel " 1826 "indication. freeing ord rel " 1827 "ind on 0x%p\n", (void *)q); 1828 freemsg(mp); 1829 } 1830 } else 1831 mir->mir_svc_pend_mp = mp; 1832 1833 mutex_exit(&mir->mir_mutex); 1834 return; 1835 1836 default: 1837 /* nfsd handles server-side non-data messages. */ 1838 break; 1839 } 1840 break; 1841 1842 default: 1843 break; 1844 } 1845 1846 putnext(q, mp); 1847 } 1848 1849 /* 1850 * The server-side read queues are used to hold inbound messages while 1851 * outbound flow control is exerted. When outbound flow control is 1852 * relieved, mir_wsrv qenables the read-side queue. Read-side queues 1853 * are not enabled by STREAMS and are explicitly noenable'ed in mir_open. 1854 */ 1855 static void 1856 mir_rsrv(queue_t *q) 1857 { 1858 mir_t *mir; 1859 mblk_t *mp; 1860 boolean_t stop_timer = B_FALSE; 1861 1862 mir = (mir_t *)q->q_ptr; 1863 mutex_enter(&mir->mir_mutex); 1864 1865 mp = NULL; 1866 switch (mir->mir_type) { 1867 case RPC_SERVER: 1868 if (mir->mir_ref_cnt == 0) 1869 mir->mir_hold_inbound = 0; 1870 if (mir->mir_hold_inbound) 1871 break; 1872 1873 while (mp = getq(q)) { 1874 if (mir->mir_krpc_cell && 1875 (mir->mir_svc_no_more_msgs == 0)) { 1876 1877 if (mir_check_len(q, mp)) 1878 return; 1879 1880 if (svc_queuereq(q, mp, TRUE)) { 1881 /* 1882 * If we were idle, turn off idle timer 1883 * since we aren't idle any more. 1884 */ 1885 if (mir->mir_ref_cnt++ == 0) 1886 stop_timer = B_TRUE; 1887 } else { 1888 (void) putbq(q, mp); 1889 break; 1890 } 1891 } else { 1892 /* 1893 * Count # of times this happens. Should be 1894 * never, but experience shows otherwise. 1895 */ 1896 if (mir->mir_krpc_cell == NULL) 1897 mir_krpc_cell_null++; 1898 freemsg(mp); 1899 } 1900 } 1901 break; 1902 case RPC_CLIENT: 1903 break; 1904 default: 1905 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type); 1906 1907 if (q->q_first == NULL) 1908 MIR_CLEAR_INRSRV(mir); 1909 1910 mutex_exit(&mir->mir_mutex); 1911 1912 return; 1913 } 1914 1915 /* 1916 * The timer is stopped after all the messages are processed. 1917 * The reason is that stopping the timer releases the mir_mutex 1918 * lock temporarily. This means that the request can be serviced 1919 * while we are still processing the message queue. This is not 1920 * good. So we stop the timer here instead. 1921 */ 1922 if (stop_timer) { 1923 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref " 1924 "cnt going to non zero\n", (void *)WR(q)); 1925 mir_svc_idle_stop(WR(q), mir); 1926 } 1927 1928 if (q->q_first == NULL) { 1929 mblk_t *cmp = NULL; 1930 1931 MIR_CLEAR_INRSRV(mir); 1932 1933 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) { 1934 cmp = mir->mir_svc_pend_mp; 1935 mir->mir_svc_pend_mp = NULL; 1936 } 1937 1938 mutex_exit(&mir->mir_mutex); 1939 1940 if (cmp != NULL) { 1941 RPCLOG(16, "mir_rsrv: line %d: sending a held " 1942 "disconnect/ord rel indication upstream\n", 1943 __LINE__); 1944 putnext(q, cmp); 1945 } 1946 1947 return; 1948 } 1949 mutex_exit(&mir->mir_mutex); 1950 } 1951 1952 static int mir_svc_policy_fails; 1953 1954 /* 1955 * Called to send an event code to nfsd/lockd so that it initiates 1956 * connection close. 1957 */ 1958 static int 1959 mir_svc_policy_notify(queue_t *q, int event) 1960 { 1961 mblk_t *mp; 1962 #ifdef DEBUG 1963 mir_t *mir = (mir_t *)q->q_ptr; 1964 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1965 #endif 1966 ASSERT(q->q_flag & QREADR); 1967 1968 /* 1969 * Create an M_DATA message with the event code and pass it to the 1970 * Stream head (nfsd or whoever created the stream will consume it). 1971 */ 1972 mp = allocb(sizeof (int), BPRI_HI); 1973 1974 if (!mp) { 1975 1976 mir_svc_policy_fails++; 1977 RPCLOG(16, "mir_svc_policy_notify: could not allocate event " 1978 "%d\n", event); 1979 return (ENOMEM); 1980 } 1981 1982 U32_TO_BE32(event, mp->b_rptr); 1983 mp->b_wptr = mp->b_rptr + sizeof (int); 1984 putnext(q, mp); 1985 return (0); 1986 } 1987 1988 /* 1989 * Server side: start the close phase. We want to get this rpcmod slot in an 1990 * idle state before mir_close() is called. 1991 */ 1992 static void 1993 mir_svc_start_close(queue_t *wq, mir_t *mir) 1994 { 1995 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1996 ASSERT((wq->q_flag & QREADR) == 0); 1997 ASSERT(mir->mir_type == RPC_SERVER); 1998 1999 /* 2000 * Do not accept any more messages. 2001 */ 2002 mir->mir_svc_no_more_msgs = 1; 2003 2004 /* 2005 * Next two statements will make the read service procedure 2006 * free everything stuck in the streams read queue. 2007 * It's not necessary because enabling the write queue will 2008 * have the same effect, but why not speed the process along? 2009 */ 2010 mir->mir_hold_inbound = 0; 2011 qenable(RD(wq)); 2012 2013 /* 2014 * Meanwhile force the write service procedure to send the 2015 * responses downstream, regardless of flow control. 2016 */ 2017 qenable(wq); 2018 } 2019 2020 void 2021 mir_svc_hold(queue_t *wq) 2022 { 2023 mir_t *mir = (mir_t *)wq->q_ptr; 2024 2025 mutex_enter(&mir->mir_mutex); 2026 mir->mir_ref_cnt++; 2027 mutex_exit(&mir->mir_mutex); 2028 } 2029 2030 /* 2031 * This routine is called directly by kRPC after a request is completed, 2032 * whether a reply was sent or the request was dropped. 2033 */ 2034 void 2035 mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable) 2036 { 2037 mir_t *mir = (mir_t *)wq->q_ptr; 2038 mblk_t *cmp = NULL; 2039 2040 ASSERT((wq->q_flag & QREADR) == 0); 2041 if (mp) 2042 freemsg(mp); 2043 2044 if (enable) 2045 qenable(RD(wq)); 2046 2047 mutex_enter(&mir->mir_mutex); 2048 2049 /* 2050 * Start idle processing if this is the last reference. 2051 */ 2052 if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) { 2053 cmp = mir->mir_svc_pend_mp; 2054 mir->mir_svc_pend_mp = NULL; 2055 } 2056 2057 if (cmp) { 2058 RPCLOG(16, "mir_svc_release: sending a held " 2059 "disconnect/ord rel indication upstream on queue 0x%p\n", 2060 (void *)RD(wq)); 2061 2062 mutex_exit(&mir->mir_mutex); 2063 2064 putnext(RD(wq), cmp); 2065 2066 mutex_enter(&mir->mir_mutex); 2067 } 2068 2069 /* 2070 * Start idle processing if this is the last reference. 2071 */ 2072 if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) { 2073 2074 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p " 2075 "because ref cnt is zero\n", (void *) wq); 2076 2077 mir_svc_idle_start(wq, mir); 2078 } 2079 2080 mir->mir_ref_cnt--; 2081 ASSERT(mir->mir_ref_cnt >= 0); 2082 2083 /* 2084 * Wake up the thread waiting to close. 2085 */ 2086 2087 if ((mir->mir_ref_cnt == 0) && mir->mir_closing) 2088 cv_signal(&mir->mir_condvar); 2089 2090 mutex_exit(&mir->mir_mutex); 2091 } 2092 2093 /* 2094 * This routine is called by server-side kRPC when it is ready to 2095 * handle inbound messages on the stream. 2096 */ 2097 static void 2098 mir_svc_start(queue_t *wq) 2099 { 2100 mir_t *mir = (mir_t *)wq->q_ptr; 2101 2102 /* 2103 * no longer need to take the mir_mutex because the 2104 * mir_setup_complete field has been moved out of 2105 * the binary field protected by the mir_mutex. 2106 */ 2107 2108 mir->mir_setup_complete = 1; 2109 qenable(RD(wq)); 2110 } 2111 2112 /* 2113 * client side wrapper for stopping timer with normal idle timeout. 2114 */ 2115 static void 2116 mir_clnt_idle_stop(queue_t *wq, mir_t *mir) 2117 { 2118 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2119 ASSERT((wq->q_flag & QREADR) == 0); 2120 ASSERT(mir->mir_type == RPC_CLIENT); 2121 2122 mir_timer_stop(mir); 2123 } 2124 2125 /* 2126 * client side wrapper for stopping timer with normal idle timeout. 2127 */ 2128 static void 2129 mir_clnt_idle_start(queue_t *wq, mir_t *mir) 2130 { 2131 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2132 ASSERT((wq->q_flag & QREADR) == 0); 2133 ASSERT(mir->mir_type == RPC_CLIENT); 2134 2135 mir_timer_start(wq, mir, mir->mir_idle_timeout); 2136 } 2137 2138 /* 2139 * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on 2140 * end-points that aren't connected. 2141 */ 2142 static void 2143 mir_clnt_idle_do_stop(queue_t *wq) 2144 { 2145 mir_t *mir = (mir_t *)wq->q_ptr; 2146 2147 RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq); 2148 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2149 mutex_enter(&mir->mir_mutex); 2150 mir_clnt_idle_stop(wq, mir); 2151 mutex_exit(&mir->mir_mutex); 2152 } 2153 2154 /* 2155 * Timer handler. It handles idle timeout and memory shortage problem. 2156 */ 2157 static void 2158 mir_timer(void *arg) 2159 { 2160 queue_t *wq = (queue_t *)arg; 2161 mir_t *mir = (mir_t *)wq->q_ptr; 2162 boolean_t notify; 2163 clock_t now; 2164 2165 mutex_enter(&mir->mir_mutex); 2166 2167 /* 2168 * mir_timer_call is set only when either mir_timer_[start|stop] 2169 * is progressing. And mir_timer() can only be run while they 2170 * are progressing if the timer is being stopped. So just 2171 * return. 2172 */ 2173 if (mir->mir_timer_call) { 2174 mutex_exit(&mir->mir_mutex); 2175 return; 2176 } 2177 mir->mir_timer_id = 0; 2178 2179 switch (mir->mir_type) { 2180 case RPC_CLIENT: 2181 2182 /* 2183 * For clients, the timer fires at clnt_idle_timeout 2184 * intervals. If the activity marker (mir_clntreq) is 2185 * zero, then the stream has been idle since the last 2186 * timer event and we notify kRPC. If mir_clntreq is 2187 * non-zero, then the stream is active and we just 2188 * restart the timer for another interval. mir_clntreq 2189 * is set to 1 in mir_wput for every request passed 2190 * downstream. 2191 * 2192 * If this was a memory shortage timer reset the idle 2193 * timeout regardless; the mir_clntreq will not be a 2194 * valid indicator. 2195 * 2196 * The timer is initially started in mir_wput during 2197 * RPC_CLIENT ioctl processing. 2198 * 2199 * The timer interval can be changed for individual 2200 * streams with the ND variable "mir_idle_timeout". 2201 */ 2202 now = ddi_get_lbolt(); 2203 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp + 2204 MSEC_TO_TICK(mir->mir_idle_timeout) - now >= 0) { 2205 clock_t tout; 2206 2207 tout = mir->mir_idle_timeout - 2208 TICK_TO_MSEC(now - mir->mir_use_timestamp); 2209 if (tout < 0) 2210 tout = 1000; 2211 #if 0 2212 printf("mir_timer[%d < %d + %d]: reset client timer " 2213 "to %d (ms)\n", TICK_TO_MSEC(now), 2214 TICK_TO_MSEC(mir->mir_use_timestamp), 2215 mir->mir_idle_timeout, tout); 2216 #endif 2217 mir->mir_clntreq = 0; 2218 mir_timer_start(wq, mir, tout); 2219 mutex_exit(&mir->mir_mutex); 2220 return; 2221 } 2222 #if 0 2223 printf("mir_timer[%d]: doing client timeout\n", now / hz); 2224 #endif 2225 /* 2226 * We are disconnecting, but not necessarily 2227 * closing. By not closing, we will fail to 2228 * pick up a possibly changed global timeout value, 2229 * unless we store it now. 2230 */ 2231 mir->mir_idle_timeout = clnt_idle_timeout; 2232 mir_clnt_idle_start(wq, mir); 2233 2234 mutex_exit(&mir->mir_mutex); 2235 /* 2236 * We pass T_ORDREL_REQ as an integer value 2237 * to kRPC as the indication that the stream 2238 * is idle. This is not a T_ORDREL_REQ message, 2239 * it is just a convenient value since we call 2240 * the same kRPC routine for T_ORDREL_INDs and 2241 * T_DISCON_INDs. 2242 */ 2243 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0); 2244 return; 2245 2246 case RPC_SERVER: 2247 2248 /* 2249 * For servers, the timer is only running when the stream 2250 * is really idle or memory is short. The timer is started 2251 * by mir_wput when mir_type is set to RPC_SERVER and 2252 * by mir_svc_idle_start whenever the stream goes idle 2253 * (mir_ref_cnt == 0). The timer is cancelled in 2254 * mir_rput whenever a new inbound request is passed to kRPC 2255 * and the stream was previously idle. 2256 * 2257 * The timer interval can be changed for individual 2258 * streams with the ND variable "mir_idle_timeout". 2259 * 2260 * If the stream is not idle do nothing. 2261 */ 2262 if (!MIR_SVC_QUIESCED(mir)) { 2263 mutex_exit(&mir->mir_mutex); 2264 return; 2265 } 2266 2267 notify = !mir->mir_inrservice; 2268 mutex_exit(&mir->mir_mutex); 2269 2270 /* 2271 * If there is no packet queued up in read queue, the stream 2272 * is really idle so notify nfsd to close it. 2273 */ 2274 if (notify) { 2275 RPCLOG(16, "mir_timer: telling stream head listener " 2276 "to close stream (0x%p)\n", (void *) RD(wq)); 2277 (void) mir_svc_policy_notify(RD(wq), 1); 2278 } 2279 return; 2280 default: 2281 RPCLOG(1, "mir_timer: unexpected mir_type %d\n", 2282 mir->mir_type); 2283 mutex_exit(&mir->mir_mutex); 2284 return; 2285 } 2286 } 2287 2288 /* 2289 * Called by the RPC package to send either a call or a return, or a 2290 * transport connection request. Adds the record marking header. 2291 */ 2292 static void 2293 mir_wput(queue_t *q, mblk_t *mp) 2294 { 2295 uint_t frag_header; 2296 mir_t *mir = (mir_t *)q->q_ptr; 2297 uchar_t *rptr = mp->b_rptr; 2298 2299 if (!mir) { 2300 freemsg(mp); 2301 return; 2302 } 2303 2304 if (mp->b_datap->db_type != M_DATA) { 2305 mir_wput_other(q, mp); 2306 return; 2307 } 2308 2309 if (mir->mir_ordrel_pending == 1) { 2310 freemsg(mp); 2311 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n", 2312 (void *)q); 2313 return; 2314 } 2315 2316 frag_header = (uint_t)DLEN(mp); 2317 frag_header |= MIR_LASTFRAG; 2318 2319 /* Stick in the 4 byte record marking header. */ 2320 if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) || 2321 !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) { 2322 /* 2323 * Since we know that M_DATA messages are created exclusively 2324 * by kRPC, we expect that kRPC will leave room for our header 2325 * and 4 byte align which is normal for XDR. 2326 * If kRPC (or someone else) does not cooperate, then we 2327 * just throw away the message. 2328 */ 2329 RPCLOG(1, "mir_wput: kRPC did not leave space for record " 2330 "fragment header (%d bytes left)\n", 2331 (int)(rptr - mp->b_datap->db_base)); 2332 freemsg(mp); 2333 return; 2334 } 2335 rptr -= sizeof (uint32_t); 2336 *(uint32_t *)rptr = htonl(frag_header); 2337 mp->b_rptr = rptr; 2338 2339 mutex_enter(&mir->mir_mutex); 2340 if (mir->mir_type == RPC_CLIENT) { 2341 /* 2342 * For the client, set mir_clntreq to indicate that the 2343 * connection is active. 2344 */ 2345 mir->mir_clntreq = 1; 2346 mir->mir_use_timestamp = ddi_get_lbolt(); 2347 } 2348 2349 /* 2350 * If we haven't already queued some data and the downstream module 2351 * can accept more data, send it on, otherwise we queue the message 2352 * and take other actions depending on mir_type. 2353 */ 2354 if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) { 2355 mutex_exit(&mir->mir_mutex); 2356 2357 /* 2358 * Now we pass the RPC message downstream. 2359 */ 2360 putnext(q, mp); 2361 return; 2362 } 2363 2364 switch (mir->mir_type) { 2365 case RPC_CLIENT: 2366 /* 2367 * Check for a previous duplicate request on the 2368 * queue. If there is one, then we throw away 2369 * the current message and let the previous one 2370 * go through. If we can't find a duplicate, then 2371 * send this one. This tap dance is an effort 2372 * to reduce traffic and processing requirements 2373 * under load conditions. 2374 */ 2375 if (mir_clnt_dup_request(q, mp)) { 2376 mutex_exit(&mir->mir_mutex); 2377 freemsg(mp); 2378 return; 2379 } 2380 break; 2381 case RPC_SERVER: 2382 /* 2383 * Set mir_hold_inbound so that new inbound RPC 2384 * messages will be held until the client catches 2385 * up on the earlier replies. This flag is cleared 2386 * in mir_wsrv after flow control is relieved; 2387 * the read-side queue is also enabled at that time. 2388 */ 2389 mir->mir_hold_inbound = 1; 2390 break; 2391 default: 2392 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type); 2393 break; 2394 } 2395 mir->mir_inwservice = 1; 2396 (void) putq(q, mp); 2397 mutex_exit(&mir->mir_mutex); 2398 } 2399 2400 static void 2401 mir_wput_other(queue_t *q, mblk_t *mp) 2402 { 2403 mir_t *mir = (mir_t *)q->q_ptr; 2404 struct iocblk *iocp; 2405 uchar_t *rptr = mp->b_rptr; 2406 bool_t flush_in_svc = FALSE; 2407 2408 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2409 switch (mp->b_datap->db_type) { 2410 case M_IOCTL: 2411 iocp = (struct iocblk *)rptr; 2412 switch (iocp->ioc_cmd) { 2413 case RPC_CLIENT: 2414 mutex_enter(&mir->mir_mutex); 2415 if (mir->mir_type != 0 && 2416 mir->mir_type != iocp->ioc_cmd) { 2417 ioc_eperm: 2418 mutex_exit(&mir->mir_mutex); 2419 iocp->ioc_error = EPERM; 2420 iocp->ioc_count = 0; 2421 mp->b_datap->db_type = M_IOCACK; 2422 qreply(q, mp); 2423 return; 2424 } 2425 2426 mir->mir_type = iocp->ioc_cmd; 2427 2428 /* 2429 * Clear mir_hold_inbound which was set to 1 by 2430 * mir_open. This flag is not used on client 2431 * streams. 2432 */ 2433 mir->mir_hold_inbound = 0; 2434 mir->mir_max_msg_sizep = &clnt_max_msg_size; 2435 2436 /* 2437 * Start the idle timer. See mir_timer() for more 2438 * information on how client timers work. 2439 */ 2440 mir->mir_idle_timeout = clnt_idle_timeout; 2441 mir_clnt_idle_start(q, mir); 2442 mutex_exit(&mir->mir_mutex); 2443 2444 mp->b_datap->db_type = M_IOCACK; 2445 qreply(q, mp); 2446 return; 2447 case RPC_SERVER: 2448 mutex_enter(&mir->mir_mutex); 2449 if (mir->mir_type != 0 && 2450 mir->mir_type != iocp->ioc_cmd) 2451 goto ioc_eperm; 2452 2453 /* 2454 * We don't clear mir_hold_inbound here because 2455 * mir_hold_inbound is used in the flow control 2456 * model. If we cleared it here, then we'd commit 2457 * a small violation to the model where the transport 2458 * might immediately block downstream flow. 2459 */ 2460 2461 mir->mir_type = iocp->ioc_cmd; 2462 mir->mir_max_msg_sizep = &svc_max_msg_size; 2463 2464 /* 2465 * Start the idle timer. See mir_timer() for more 2466 * information on how server timers work. 2467 * 2468 * Note that it is important to start the idle timer 2469 * here so that connections time out even if we 2470 * never receive any data on them. 2471 */ 2472 mir->mir_idle_timeout = svc_idle_timeout; 2473 RPCLOG(16, "mir_wput_other starting idle timer on 0x%p " 2474 "because we got RPC_SERVER ioctl\n", (void *)q); 2475 mir_svc_idle_start(q, mir); 2476 mutex_exit(&mir->mir_mutex); 2477 2478 mp->b_datap->db_type = M_IOCACK; 2479 qreply(q, mp); 2480 return; 2481 default: 2482 break; 2483 } 2484 break; 2485 2486 case M_PROTO: 2487 if (mir->mir_type == RPC_CLIENT) { 2488 /* 2489 * We are likely being called from the context of a 2490 * service procedure. So we need to enqueue. However 2491 * enqueing may put our message behind data messages. 2492 * So flush the data first. 2493 */ 2494 flush_in_svc = TRUE; 2495 } 2496 if ((mp->b_wptr - rptr) < sizeof (uint32_t) || 2497 !IS_P2ALIGNED(rptr, sizeof (uint32_t))) 2498 break; 2499 2500 switch (((union T_primitives *)rptr)->type) { 2501 case T_DATA_REQ: 2502 /* Don't pass T_DATA_REQ messages downstream. */ 2503 freemsg(mp); 2504 return; 2505 case T_ORDREL_REQ: 2506 RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n", 2507 (void *)q); 2508 mutex_enter(&mir->mir_mutex); 2509 if (mir->mir_type != RPC_SERVER) { 2510 /* 2511 * We are likely being called from 2512 * clnt_dispatch_notifyall(). Sending 2513 * a T_ORDREL_REQ will result in 2514 * a some kind of _IND message being sent, 2515 * will be another call to 2516 * clnt_dispatch_notifyall(). To keep the stack 2517 * lean, queue this message. 2518 */ 2519 mir->mir_inwservice = 1; 2520 (void) putq(q, mp); 2521 mutex_exit(&mir->mir_mutex); 2522 return; 2523 } 2524 2525 /* 2526 * Mark the structure such that we don't accept any 2527 * more requests from client. We could defer this 2528 * until we actually send the orderly release 2529 * request downstream, but all that does is delay 2530 * the closing of this stream. 2531 */ 2532 RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ " 2533 " so calling mir_svc_start_close\n", (void *)q); 2534 2535 mir_svc_start_close(q, mir); 2536 2537 /* 2538 * If we have sent down a T_ORDREL_REQ, don't send 2539 * any more. 2540 */ 2541 if (mir->mir_ordrel_pending) { 2542 freemsg(mp); 2543 mutex_exit(&mir->mir_mutex); 2544 return; 2545 } 2546 2547 /* 2548 * If the stream is not idle, then we hold the 2549 * orderly release until it becomes idle. This 2550 * ensures that kRPC will be able to reply to 2551 * all requests that we have passed to it. 2552 * 2553 * We also queue the request if there is data already 2554 * queued, because we cannot allow the T_ORDREL_REQ 2555 * to go before data. When we had a separate reply 2556 * count, this was not a problem, because the 2557 * reply count was reconciled when mir_wsrv() 2558 * completed. 2559 */ 2560 if (!MIR_SVC_QUIESCED(mir) || 2561 mir->mir_inwservice == 1) { 2562 mir->mir_inwservice = 1; 2563 (void) putq(q, mp); 2564 2565 RPCLOG(16, "mir_wput_other: queuing " 2566 "T_ORDREL_REQ on 0x%p\n", (void *)q); 2567 2568 mutex_exit(&mir->mir_mutex); 2569 return; 2570 } 2571 2572 /* 2573 * Mark the structure so that we know we sent 2574 * an orderly release request, and reset the idle timer. 2575 */ 2576 mir->mir_ordrel_pending = 1; 2577 2578 RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start" 2579 " on 0x%p because we got T_ORDREL_REQ\n", 2580 (void *)q); 2581 2582 mir_svc_idle_start(q, mir); 2583 mutex_exit(&mir->mir_mutex); 2584 2585 /* 2586 * When we break, we will putnext the T_ORDREL_REQ. 2587 */ 2588 break; 2589 2590 case T_CONN_REQ: 2591 mutex_enter(&mir->mir_mutex); 2592 if (mir->mir_head_mp != NULL) { 2593 freemsg(mir->mir_head_mp); 2594 mir->mir_head_mp = NULL; 2595 mir->mir_tail_mp = NULL; 2596 } 2597 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 2598 /* 2599 * Restart timer in case mir_clnt_idle_do_stop() was 2600 * called. 2601 */ 2602 mir->mir_idle_timeout = clnt_idle_timeout; 2603 mir_clnt_idle_stop(q, mir); 2604 mir_clnt_idle_start(q, mir); 2605 mutex_exit(&mir->mir_mutex); 2606 break; 2607 2608 default: 2609 /* 2610 * T_DISCON_REQ is one of the interesting default 2611 * cases here. Ideally, an M_FLUSH is done before 2612 * T_DISCON_REQ is done. However, that is somewhat 2613 * cumbersome for clnt_cots.c to do. So we queue 2614 * T_DISCON_REQ, and let the service procedure 2615 * flush all M_DATA. 2616 */ 2617 break; 2618 } 2619 /* fallthru */; 2620 default: 2621 if (mp->b_datap->db_type >= QPCTL) { 2622 if (mp->b_datap->db_type == M_FLUSH) { 2623 if (mir->mir_type == RPC_CLIENT && 2624 *mp->b_rptr & FLUSHW) { 2625 RPCLOG(32, "mir_wput_other: flushing " 2626 "wq 0x%p\n", (void *)q); 2627 if (*mp->b_rptr & FLUSHBAND) { 2628 flushband(q, *(mp->b_rptr + 1), 2629 FLUSHDATA); 2630 } else { 2631 flushq(q, FLUSHDATA); 2632 } 2633 } else { 2634 RPCLOG(32, "mir_wput_other: ignoring " 2635 "M_FLUSH on wq 0x%p\n", (void *)q); 2636 } 2637 } 2638 break; 2639 } 2640 2641 mutex_enter(&mir->mir_mutex); 2642 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) { 2643 mutex_exit(&mir->mir_mutex); 2644 break; 2645 } 2646 mir->mir_inwservice = 1; 2647 mir->mir_inwflushdata = flush_in_svc; 2648 (void) putq(q, mp); 2649 mutex_exit(&mir->mir_mutex); 2650 qenable(q); 2651 2652 return; 2653 } 2654 putnext(q, mp); 2655 } 2656 2657 static void 2658 mir_wsrv(queue_t *q) 2659 { 2660 mblk_t *mp; 2661 mir_t *mir; 2662 bool_t flushdata; 2663 2664 mir = (mir_t *)q->q_ptr; 2665 mutex_enter(&mir->mir_mutex); 2666 2667 flushdata = mir->mir_inwflushdata; 2668 mir->mir_inwflushdata = 0; 2669 2670 while (mp = getq(q)) { 2671 if (mp->b_datap->db_type == M_DATA) { 2672 /* 2673 * Do not send any more data if we have sent 2674 * a T_ORDREL_REQ. 2675 */ 2676 if (flushdata || mir->mir_ordrel_pending == 1) { 2677 freemsg(mp); 2678 continue; 2679 } 2680 2681 /* 2682 * Make sure that the stream can really handle more 2683 * data. 2684 */ 2685 if (!MIR_WCANPUTNEXT(mir, q)) { 2686 (void) putbq(q, mp); 2687 mutex_exit(&mir->mir_mutex); 2688 return; 2689 } 2690 2691 /* 2692 * Now we pass the RPC message downstream. 2693 */ 2694 mutex_exit(&mir->mir_mutex); 2695 putnext(q, mp); 2696 mutex_enter(&mir->mir_mutex); 2697 continue; 2698 } 2699 2700 /* 2701 * This is not an RPC message, pass it downstream 2702 * (ignoring flow control) if the server side is not sending a 2703 * T_ORDREL_REQ downstream. 2704 */ 2705 if (mir->mir_type != RPC_SERVER || 2706 ((union T_primitives *)mp->b_rptr)->type != 2707 T_ORDREL_REQ) { 2708 mutex_exit(&mir->mir_mutex); 2709 putnext(q, mp); 2710 mutex_enter(&mir->mir_mutex); 2711 continue; 2712 } 2713 2714 if (mir->mir_ordrel_pending == 1) { 2715 /* 2716 * Don't send two T_ORDRELs 2717 */ 2718 freemsg(mp); 2719 continue; 2720 } 2721 2722 /* 2723 * Mark the structure so that we know we sent an orderly 2724 * release request. We will check to see slot is idle at the 2725 * end of this routine, and if so, reset the idle timer to 2726 * handle orderly release timeouts. 2727 */ 2728 mir->mir_ordrel_pending = 1; 2729 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n", 2730 (void *)q); 2731 /* 2732 * Send the orderly release downstream. If there are other 2733 * pending replies we won't be able to send them. However, 2734 * the only reason we should send the orderly release is if 2735 * we were idle, or if an unusual event occurred. 2736 */ 2737 mutex_exit(&mir->mir_mutex); 2738 putnext(q, mp); 2739 mutex_enter(&mir->mir_mutex); 2740 } 2741 2742 if (q->q_first == NULL) 2743 /* 2744 * If we call mir_svc_idle_start() below, then 2745 * clearing mir_inwservice here will also result in 2746 * any thread waiting in mir_close() to be signaled. 2747 */ 2748 mir->mir_inwservice = 0; 2749 2750 if (mir->mir_type != RPC_SERVER) { 2751 mutex_exit(&mir->mir_mutex); 2752 return; 2753 } 2754 2755 /* 2756 * If idle we call mir_svc_idle_start to start the timer (or wakeup 2757 * a close). Also make sure not to start the idle timer on the 2758 * listener stream. This can cause nfsd to send an orderly release 2759 * command on the listener stream. 2760 */ 2761 if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) { 2762 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p " 2763 "because mir slot is idle\n", (void *)q); 2764 mir_svc_idle_start(q, mir); 2765 } 2766 2767 /* 2768 * If outbound flow control has been relieved, then allow new 2769 * inbound requests to be processed. 2770 */ 2771 if (mir->mir_hold_inbound) { 2772 mir->mir_hold_inbound = 0; 2773 qenable(RD(q)); 2774 } 2775 mutex_exit(&mir->mir_mutex); 2776 } 2777 2778 static void 2779 mir_disconnect(queue_t *q, mir_t *mir) 2780 { 2781 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2782 2783 switch (mir->mir_type) { 2784 case RPC_CLIENT: 2785 /* 2786 * We are disconnecting, but not necessarily 2787 * closing. By not closing, we will fail to 2788 * pick up a possibly changed global timeout value, 2789 * unless we store it now. 2790 */ 2791 mir->mir_idle_timeout = clnt_idle_timeout; 2792 mir_clnt_idle_start(WR(q), mir); 2793 mutex_exit(&mir->mir_mutex); 2794 2795 /* 2796 * T_DISCON_REQ is passed to kRPC as an integer value 2797 * (this is not a TPI message). It is used as a 2798 * convenient value to indicate a sanity check 2799 * failure -- the same kRPC routine is also called 2800 * for T_DISCON_INDs and T_ORDREL_INDs. 2801 */ 2802 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0); 2803 break; 2804 2805 case RPC_SERVER: 2806 mir->mir_svc_no_more_msgs = 1; 2807 mir_svc_idle_stop(WR(q), mir); 2808 mutex_exit(&mir->mir_mutex); 2809 RPCLOG(16, "mir_disconnect: telling " 2810 "stream head listener to disconnect stream " 2811 "(0x%p)\n", (void *) q); 2812 (void) mir_svc_policy_notify(q, 2); 2813 break; 2814 2815 default: 2816 mutex_exit(&mir->mir_mutex); 2817 break; 2818 } 2819 } 2820 2821 /* 2822 * Sanity check the message length, and if it's too large, shutdown the 2823 * connection. Returns 1 if the connection is shutdown; 0 otherwise. 2824 */ 2825 static int 2826 mir_check_len(queue_t *q, mblk_t *head_mp) 2827 { 2828 mir_t *mir = q->q_ptr; 2829 uint_t maxsize = 0; 2830 size_t msg_len = msgdsize(head_mp); 2831 2832 if (mir->mir_max_msg_sizep != NULL) 2833 maxsize = *mir->mir_max_msg_sizep; 2834 2835 if (maxsize == 0 || msg_len <= maxsize) 2836 return (0); 2837 2838 freemsg(head_mp); 2839 mir->mir_head_mp = NULL; 2840 mir->mir_tail_mp = NULL; 2841 mir->mir_frag_header = 0; 2842 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 2843 if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) { 2844 cmn_err(CE_NOTE, 2845 "kRPC: record fragment from %s of size(%lu) exceeds " 2846 "maximum (%u). Disconnecting", 2847 (mir->mir_type == RPC_CLIENT) ? "server" : 2848 (mir->mir_type == RPC_SERVER) ? "client" : 2849 "test tool", msg_len, maxsize); 2850 } 2851 2852 mir_disconnect(q, mir); 2853 return (1); 2854 } 2855