1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2012 Milan Jurik. All rights reserved. 27 * Copyright 2012 Marcel Telka <marcel@telka.sk> 28 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 29 * Copyright 2018 OmniOS Community Edition (OmniOSce) Association. 30 */ 31 /* Copyright (c) 1990 Mentat Inc. */ 32 33 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 34 /* All Rights Reserved */ 35 36 /* 37 * Kernel RPC filtering module 38 */ 39 40 #include <sys/param.h> 41 #include <sys/types.h> 42 #include <sys/stream.h> 43 #include <sys/stropts.h> 44 #include <sys/strsubr.h> 45 #include <sys/tihdr.h> 46 #include <sys/timod.h> 47 #include <sys/tiuser.h> 48 #include <sys/debug.h> 49 #include <sys/signal.h> 50 #include <sys/pcb.h> 51 #include <sys/user.h> 52 #include <sys/errno.h> 53 #include <sys/cred.h> 54 #include <sys/policy.h> 55 #include <sys/inline.h> 56 #include <sys/cmn_err.h> 57 #include <sys/kmem.h> 58 #include <sys/file.h> 59 #include <sys/sysmacros.h> 60 #include <sys/systm.h> 61 #include <sys/t_lock.h> 62 #include <sys/ddi.h> 63 #include <sys/vtrace.h> 64 #include <sys/callb.h> 65 #include <sys/strsun.h> 66 67 #include <sys/strlog.h> 68 #include <rpc/rpc_com.h> 69 #include <inet/common.h> 70 #include <rpc/types.h> 71 #include <sys/time.h> 72 #include <rpc/xdr.h> 73 #include <rpc/auth.h> 74 #include <rpc/clnt.h> 75 #include <rpc/rpc_msg.h> 76 #include <rpc/clnt.h> 77 #include <rpc/svc.h> 78 #include <rpc/rpcsys.h> 79 #include <rpc/rpc_rdma.h> 80 81 /* 82 * This is the loadable module wrapper. 83 */ 84 #include <sys/conf.h> 85 #include <sys/modctl.h> 86 #include <sys/syscall.h> 87 88 extern struct streamtab rpcinfo; 89 90 static struct fmodsw fsw = { 91 "rpcmod", 92 &rpcinfo, 93 D_NEW|D_MP, 94 }; 95 96 /* 97 * Module linkage information for the kernel. 98 */ 99 100 static struct modlstrmod modlstrmod = { 101 &mod_strmodops, "rpc interface str mod", &fsw 102 }; 103 104 /* 105 * For the RPC system call. 106 */ 107 static struct sysent rpcsysent = { 108 2, 109 SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD, 110 rpcsys 111 }; 112 113 static struct modlsys modlsys = { 114 &mod_syscallops, 115 "RPC syscall", 116 &rpcsysent 117 }; 118 119 #ifdef _SYSCALL32_IMPL 120 static struct modlsys modlsys32 = { 121 &mod_syscallops32, 122 "32-bit RPC syscall", 123 &rpcsysent 124 }; 125 #endif /* _SYSCALL32_IMPL */ 126 127 static struct modlinkage modlinkage = { 128 MODREV_1, 129 { 130 &modlsys, 131 #ifdef _SYSCALL32_IMPL 132 &modlsys32, 133 #endif 134 &modlstrmod, 135 NULL 136 } 137 }; 138 139 int 140 _init(void) 141 { 142 int error = 0; 143 callb_id_t cid; 144 int status; 145 146 svc_init(); 147 clnt_init(); 148 cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc"); 149 150 if (error = mod_install(&modlinkage)) { 151 /* 152 * Could not install module, cleanup previous 153 * initialization work. 154 */ 155 clnt_fini(); 156 if (cid != NULL) 157 (void) callb_delete(cid); 158 159 return (error); 160 } 161 162 /* 163 * Load up the RDMA plugins and initialize the stats. Even if the 164 * plugins loadup fails, but rpcmod was successfully installed the 165 * counters still get initialized. 166 */ 167 rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL); 168 mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL); 169 170 cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL); 171 mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL); 172 173 mt_kstat_init(); 174 175 /* 176 * Get our identification into ldi. This is used for loading 177 * other modules, e.g. rpcib. 178 */ 179 status = ldi_ident_from_mod(&modlinkage, &rpcmod_li); 180 if (status != 0) { 181 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status); 182 rpcmod_li = NULL; 183 } 184 185 return (error); 186 } 187 188 /* 189 * The unload entry point fails, because we advertise entry points into 190 * rpcmod from the rest of kRPC: rpcmod_release(). 191 */ 192 int 193 _fini(void) 194 { 195 return (EBUSY); 196 } 197 198 int 199 _info(struct modinfo *modinfop) 200 { 201 return (mod_info(&modlinkage, modinfop)); 202 } 203 204 extern int nulldev(); 205 206 #define RPCMOD_ID 2049 207 208 int rmm_open(queue_t *, dev_t *, int, int, cred_t *); 209 int rmm_close(queue_t *, int, cred_t *); 210 211 /* 212 * To save instructions, since STREAMS ignores the return value 213 * from these functions, they are defined as void here. Kind of icky, but... 214 */ 215 void rmm_rput(queue_t *, mblk_t *); 216 void rmm_wput(queue_t *, mblk_t *); 217 void rmm_rsrv(queue_t *); 218 void rmm_wsrv(queue_t *); 219 220 int rpcmodopen(queue_t *, dev_t *, int, int, cred_t *); 221 int rpcmodclose(queue_t *, int, cred_t *); 222 void rpcmodrput(queue_t *, mblk_t *); 223 void rpcmodwput(queue_t *, mblk_t *); 224 void rpcmodrsrv(); 225 void rpcmodwsrv(queue_t *); 226 227 static void rpcmodwput_other(queue_t *, mblk_t *); 228 static int mir_close(queue_t *q); 229 static int mir_open(queue_t *q, dev_t *devp, int flag, int sflag, 230 cred_t *credp); 231 static void mir_rput(queue_t *q, mblk_t *mp); 232 static void mir_rsrv(queue_t *q); 233 static void mir_wput(queue_t *q, mblk_t *mp); 234 static void mir_wsrv(queue_t *q); 235 236 static struct module_info rpcmod_info = 237 {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024}; 238 239 static struct qinit rpcmodrinit = { 240 (int (*)())rmm_rput, 241 (int (*)())rmm_rsrv, 242 rmm_open, 243 rmm_close, 244 nulldev, 245 &rpcmod_info, 246 NULL 247 }; 248 249 /* 250 * The write put procedure is simply putnext to conserve stack space. 251 * The write service procedure is not used to queue data, but instead to 252 * synchronize with flow control. 253 */ 254 static struct qinit rpcmodwinit = { 255 (int (*)())rmm_wput, 256 (int (*)())rmm_wsrv, 257 rmm_open, 258 rmm_close, 259 nulldev, 260 &rpcmod_info, 261 NULL 262 }; 263 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL }; 264 265 struct xprt_style_ops { 266 int (*xo_open)(); 267 int (*xo_close)(); 268 void (*xo_wput)(); 269 void (*xo_wsrv)(); 270 void (*xo_rput)(); 271 void (*xo_rsrv)(); 272 }; 273 274 /* 275 * Read side has no service procedure. 276 */ 277 static struct xprt_style_ops xprt_clts_ops = { 278 rpcmodopen, 279 rpcmodclose, 280 rpcmodwput, 281 rpcmodwsrv, 282 rpcmodrput, 283 NULL 284 }; 285 286 static struct xprt_style_ops xprt_cots_ops = { 287 mir_open, 288 mir_close, 289 mir_wput, 290 mir_wsrv, 291 mir_rput, 292 mir_rsrv 293 }; 294 295 /* 296 * Per rpcmod "slot" data structure. q->q_ptr points to one of these. 297 */ 298 struct rpcm { 299 void *rm_krpc_cell; /* Reserved for use by kRPC */ 300 struct xprt_style_ops *rm_ops; 301 int rm_type; /* Client or server side stream */ 302 #define RM_CLOSING 0x1 /* somebody is trying to close slot */ 303 uint_t rm_state; /* state of the slot. see above */ 304 uint_t rm_ref; /* cnt of external references to slot */ 305 kmutex_t rm_lock; /* mutex protecting above fields */ 306 kcondvar_t rm_cwait; /* condition for closing */ 307 zoneid_t rm_zoneid; /* zone which pushed rpcmod */ 308 }; 309 310 struct temp_slot { 311 void *cell; 312 struct xprt_style_ops *ops; 313 int type; 314 mblk_t *info_ack; 315 kmutex_t lock; 316 kcondvar_t wait; 317 }; 318 319 typedef struct mir_s { 320 void *mir_krpc_cell; /* Reserved for kRPC use. This field */ 321 /* must be first in the structure. */ 322 struct xprt_style_ops *rm_ops; 323 int mir_type; /* Client or server side stream */ 324 325 mblk_t *mir_head_mp; /* RPC msg in progress */ 326 /* 327 * mir_head_mp points the first mblk being collected in 328 * the current RPC message. Record headers are removed 329 * before data is linked into mir_head_mp. 330 */ 331 mblk_t *mir_tail_mp; /* Last mblk in mir_head_mp */ 332 /* 333 * mir_tail_mp points to the last mblk in the message 334 * chain starting at mir_head_mp. It is only valid 335 * if mir_head_mp is non-NULL and is used to add new 336 * data blocks to the end of chain quickly. 337 */ 338 339 int32_t mir_frag_len; /* Bytes seen in the current frag */ 340 /* 341 * mir_frag_len starts at -4 for beginning of each fragment. 342 * When this length is negative, it indicates the number of 343 * bytes that rpcmod needs to complete the record marker 344 * header. When it is positive or zero, it holds the number 345 * of bytes that have arrived for the current fragment and 346 * are held in mir_header_mp. 347 */ 348 349 int32_t mir_frag_header; 350 /* 351 * Fragment header as collected for the current fragment. 352 * It holds the last-fragment indicator and the number 353 * of bytes in the fragment. 354 */ 355 356 unsigned int 357 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */ 358 mir_hold_inbound : 1, /* Hold inbound messages on server */ 359 /* side until outbound flow control */ 360 /* is relieved. */ 361 mir_closing : 1, /* The stream is being closed */ 362 mir_inrservice : 1, /* data queued or rd srv proc running */ 363 mir_inwservice : 1, /* data queued or wr srv proc running */ 364 mir_inwflushdata : 1, /* flush M_DATAs when srv runs */ 365 /* 366 * On client streams, mir_clntreq is 0 or 1; it is set 367 * to 1 whenever a new request is sent out (mir_wput) 368 * and cleared when the timer fires (mir_timer). If 369 * the timer fires with this value equal to 0, then the 370 * stream is considered idle and kRPC is notified. 371 */ 372 mir_clntreq : 1, 373 /* 374 * On server streams, stop accepting messages 375 */ 376 mir_svc_no_more_msgs : 1, 377 mir_listen_stream : 1, /* listen end point */ 378 mir_unused : 1, /* no longer used */ 379 mir_timer_call : 1, 380 mir_junk_fill_thru_bit_31 : 21; 381 382 int mir_setup_complete; /* server has initialized everything */ 383 timeout_id_t mir_timer_id; /* Timer for idle checks */ 384 clock_t mir_idle_timeout; /* Allowed idle time before shutdown */ 385 /* 386 * This value is copied from clnt_idle_timeout or 387 * svc_idle_timeout during the appropriate ioctl. 388 * Kept in milliseconds 389 */ 390 clock_t mir_use_timestamp; /* updated on client with each use */ 391 /* 392 * This value is set to lbolt 393 * every time a client stream sends or receives data. 394 * Even if the timer message arrives, we don't shutdown 395 * client unless: 396 * lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp. 397 * This value is kept in HZ. 398 */ 399 400 uint_t *mir_max_msg_sizep; /* Reference to sanity check size */ 401 /* 402 * This pointer is set to &clnt_max_msg_size or 403 * &svc_max_msg_size during the appropriate ioctl. 404 */ 405 zoneid_t mir_zoneid; /* zone which pushed rpcmod */ 406 /* Server-side fields. */ 407 int mir_ref_cnt; /* Reference count: server side only */ 408 /* counts the number of references */ 409 /* that a kernel RPC server thread */ 410 /* (see svc_run()) has on this rpcmod */ 411 /* slot. Effectively, it is the */ 412 /* number of unprocessed messages */ 413 /* that have been passed up to the */ 414 /* kRPC layer */ 415 416 mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */ 417 /* T_DISCON_IND */ 418 419 /* 420 * these fields are for both client and server, but for debugging, 421 * it is easier to have these last in the structure. 422 */ 423 kmutex_t mir_mutex; /* Mutex and condvar for close */ 424 kcondvar_t mir_condvar; /* synchronization. */ 425 kcondvar_t mir_timer_cv; /* Timer routine sync. */ 426 } mir_t; 427 428 void tmp_rput(queue_t *q, mblk_t *mp); 429 430 struct xprt_style_ops tmpops = { 431 NULL, 432 NULL, 433 putnext, 434 NULL, 435 tmp_rput, 436 NULL 437 }; 438 439 void 440 tmp_rput(queue_t *q, mblk_t *mp) 441 { 442 struct temp_slot *t = (struct temp_slot *)(q->q_ptr); 443 struct T_info_ack *pptr; 444 445 switch (mp->b_datap->db_type) { 446 case M_PCPROTO: 447 pptr = (struct T_info_ack *)mp->b_rptr; 448 switch (pptr->PRIM_type) { 449 case T_INFO_ACK: 450 mutex_enter(&t->lock); 451 t->info_ack = mp; 452 cv_signal(&t->wait); 453 mutex_exit(&t->lock); 454 return; 455 default: 456 break; 457 } 458 default: 459 break; 460 } 461 462 /* 463 * Not an info-ack, so free it. This is ok because we should 464 * not be receiving data until the open finishes: rpcmod 465 * is pushed well before the end-point is bound to an address. 466 */ 467 freemsg(mp); 468 } 469 470 int 471 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) 472 { 473 mblk_t *bp; 474 struct temp_slot ts, *t; 475 struct T_info_ack *pptr; 476 int error = 0; 477 478 ASSERT(q != NULL); 479 /* 480 * Check for re-opens. 481 */ 482 if (q->q_ptr) { 483 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, 484 "rpcmodopen_end:(%s)", "q->qptr"); 485 return (0); 486 } 487 488 t = &ts; 489 bzero(t, sizeof (*t)); 490 q->q_ptr = (void *)t; 491 WR(q)->q_ptr = (void *)t; 492 493 /* 494 * Allocate the required messages upfront. 495 */ 496 if ((bp = allocb_cred(sizeof (struct T_info_req) + 497 sizeof (struct T_info_ack), crp, curproc->p_pid)) == NULL) { 498 return (ENOBUFS); 499 } 500 501 mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL); 502 cv_init(&t->wait, NULL, CV_DEFAULT, NULL); 503 504 t->ops = &tmpops; 505 506 qprocson(q); 507 bp->b_datap->db_type = M_PCPROTO; 508 *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ; 509 bp->b_wptr += sizeof (struct T_info_req); 510 putnext(WR(q), bp); 511 512 mutex_enter(&t->lock); 513 while (t->info_ack == NULL) { 514 if (cv_wait_sig(&t->wait, &t->lock) == 0) { 515 error = EINTR; 516 break; 517 } 518 } 519 mutex_exit(&t->lock); 520 521 if (error) 522 goto out; 523 524 pptr = (struct T_info_ack *)t->info_ack->b_rptr; 525 526 if (pptr->SERV_type == T_CLTS) { 527 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0) 528 ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops; 529 } else { 530 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0) 531 ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops; 532 } 533 534 out: 535 if (error) 536 qprocsoff(q); 537 538 freemsg(t->info_ack); 539 mutex_destroy(&t->lock); 540 cv_destroy(&t->wait); 541 542 return (error); 543 } 544 545 void 546 rmm_rput(queue_t *q, mblk_t *mp) 547 { 548 (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp); 549 } 550 551 void 552 rmm_rsrv(queue_t *q) 553 { 554 (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q); 555 } 556 557 void 558 rmm_wput(queue_t *q, mblk_t *mp) 559 { 560 (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp); 561 } 562 563 void 564 rmm_wsrv(queue_t *q) 565 { 566 (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q); 567 } 568 569 int 570 rmm_close(queue_t *q, int flag, cred_t *crp) 571 { 572 return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp)); 573 } 574 575 /* 576 * rpcmodopen - open routine gets called when the module gets pushed 577 * onto the stream. 578 */ 579 /*ARGSUSED*/ 580 int 581 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) 582 { 583 struct rpcm *rmp; 584 585 TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:"); 586 587 /* 588 * Only sufficiently privileged users can use this module, and it 589 * is assumed that they will use this module properly, and NOT send 590 * bulk data from downstream. 591 */ 592 if (secpolicy_rpcmod_open(crp) != 0) 593 return (EPERM); 594 595 /* 596 * Allocate slot data structure. 597 */ 598 rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP); 599 600 mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL); 601 cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL); 602 rmp->rm_zoneid = rpc_zoneid(); 603 /* 604 * slot type will be set by kRPC client and server ioctl's 605 */ 606 rmp->rm_type = 0; 607 608 q->q_ptr = (void *)rmp; 609 WR(q)->q_ptr = (void *)rmp; 610 611 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end"); 612 return (0); 613 } 614 615 /* 616 * rpcmodclose - This routine gets called when the module gets popped 617 * off of the stream. 618 */ 619 /*ARGSUSED*/ 620 int 621 rpcmodclose(queue_t *q, int flag, cred_t *crp) 622 { 623 struct rpcm *rmp; 624 625 ASSERT(q != NULL); 626 rmp = (struct rpcm *)q->q_ptr; 627 628 /* 629 * Mark our state as closing. 630 */ 631 mutex_enter(&rmp->rm_lock); 632 rmp->rm_state |= RM_CLOSING; 633 634 /* 635 * Check and see if there are any messages on the queue. If so, send 636 * the messages, regardless whether the downstream module is ready to 637 * accept data. 638 */ 639 if (rmp->rm_type == RPC_SERVER) { 640 flushq(q, FLUSHDATA); 641 642 qenable(WR(q)); 643 644 if (rmp->rm_ref) { 645 mutex_exit(&rmp->rm_lock); 646 /* 647 * call into SVC to clean the queue 648 */ 649 svc_queueclean(q); 650 mutex_enter(&rmp->rm_lock); 651 652 /* 653 * Block while there are kRPC threads with a reference 654 * to this message. 655 */ 656 while (rmp->rm_ref) 657 cv_wait(&rmp->rm_cwait, &rmp->rm_lock); 658 } 659 660 mutex_exit(&rmp->rm_lock); 661 662 /* 663 * It is now safe to remove this queue from the stream. No kRPC 664 * threads have a reference to the stream, and none ever will, 665 * because RM_CLOSING is set. 666 */ 667 qprocsoff(q); 668 669 /* Notify kRPC that this stream is going away. */ 670 svc_queueclose(q); 671 } else { 672 mutex_exit(&rmp->rm_lock); 673 qprocsoff(q); 674 } 675 676 q->q_ptr = NULL; 677 WR(q)->q_ptr = NULL; 678 mutex_destroy(&rmp->rm_lock); 679 cv_destroy(&rmp->rm_cwait); 680 kmem_free(rmp, sizeof (*rmp)); 681 return (0); 682 } 683 684 /* 685 * rpcmodrput - Module read put procedure. This is called from 686 * the module, driver, or stream head downstream. 687 */ 688 void 689 rpcmodrput(queue_t *q, mblk_t *mp) 690 { 691 struct rpcm *rmp; 692 union T_primitives *pptr; 693 int hdrsz; 694 695 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:"); 696 697 ASSERT(q != NULL); 698 rmp = (struct rpcm *)q->q_ptr; 699 700 if (rmp->rm_type == 0) { 701 freemsg(mp); 702 return; 703 } 704 705 switch (mp->b_datap->db_type) { 706 default: 707 putnext(q, mp); 708 break; 709 710 case M_PROTO: 711 case M_PCPROTO: 712 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t)); 713 pptr = (union T_primitives *)mp->b_rptr; 714 715 /* 716 * Forward this message to kRPC if it is data. 717 */ 718 if (pptr->type == T_UNITDATA_IND) { 719 /* 720 * Check if the module is being popped. 721 */ 722 mutex_enter(&rmp->rm_lock); 723 if (rmp->rm_state & RM_CLOSING) { 724 mutex_exit(&rmp->rm_lock); 725 putnext(q, mp); 726 break; 727 } 728 729 switch (rmp->rm_type) { 730 case RPC_CLIENT: 731 mutex_exit(&rmp->rm_lock); 732 hdrsz = mp->b_wptr - mp->b_rptr; 733 734 /* 735 * Make sure the header is sane. 736 */ 737 if (hdrsz < TUNITDATAINDSZ || 738 hdrsz < (pptr->unitdata_ind.OPT_length + 739 pptr->unitdata_ind.OPT_offset) || 740 hdrsz < (pptr->unitdata_ind.SRC_length + 741 pptr->unitdata_ind.SRC_offset)) { 742 freemsg(mp); 743 return; 744 } 745 746 /* 747 * Call clnt_clts_dispatch_notify, so that it 748 * can pass the message to the proper caller. 749 * Don't discard the header just yet since the 750 * client may need the sender's address. 751 */ 752 clnt_clts_dispatch_notify(mp, hdrsz, 753 rmp->rm_zoneid); 754 return; 755 case RPC_SERVER: 756 /* 757 * rm_krpc_cell is exclusively used by the kRPC 758 * CLTS server. Try to submit the message to 759 * kRPC. Since this is an unreliable channel, we 760 * can just free the message in case the kRPC 761 * does not accept new messages. 762 */ 763 if (rmp->rm_krpc_cell && 764 svc_queuereq(q, mp, TRUE)) { 765 /* 766 * Raise the reference count on this 767 * module to prevent it from being 768 * popped before kRPC generates the 769 * reply. 770 */ 771 rmp->rm_ref++; 772 mutex_exit(&rmp->rm_lock); 773 } else { 774 mutex_exit(&rmp->rm_lock); 775 freemsg(mp); 776 } 777 return; 778 default: 779 mutex_exit(&rmp->rm_lock); 780 freemsg(mp); 781 return; 782 } /* end switch(rmp->rm_type) */ 783 } else if (pptr->type == T_UDERROR_IND) { 784 mutex_enter(&rmp->rm_lock); 785 hdrsz = mp->b_wptr - mp->b_rptr; 786 787 /* 788 * Make sure the header is sane 789 */ 790 if (hdrsz < TUDERRORINDSZ || 791 hdrsz < (pptr->uderror_ind.OPT_length + 792 pptr->uderror_ind.OPT_offset) || 793 hdrsz < (pptr->uderror_ind.DEST_length + 794 pptr->uderror_ind.DEST_offset)) { 795 mutex_exit(&rmp->rm_lock); 796 freemsg(mp); 797 return; 798 } 799 800 /* 801 * In the case where a unit data error has been 802 * received, all we need to do is clear the message from 803 * the queue. 804 */ 805 mutex_exit(&rmp->rm_lock); 806 freemsg(mp); 807 RPCLOG(32, "rpcmodrput: unitdata error received at " 808 "%ld\n", gethrestime_sec()); 809 return; 810 } /* end else if (pptr->type == T_UDERROR_IND) */ 811 812 putnext(q, mp); 813 break; 814 } /* end switch (mp->b_datap->db_type) */ 815 816 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END, 817 "rpcmodrput_end:"); 818 /* 819 * Return codes are not looked at by the STREAMS framework. 820 */ 821 } 822 823 /* 824 * write put procedure 825 */ 826 void 827 rpcmodwput(queue_t *q, mblk_t *mp) 828 { 829 struct rpcm *rmp; 830 831 ASSERT(q != NULL); 832 833 switch (mp->b_datap->db_type) { 834 case M_PROTO: 835 case M_PCPROTO: 836 break; 837 default: 838 rpcmodwput_other(q, mp); 839 return; 840 } 841 842 /* 843 * Check to see if we can send the message downstream. 844 */ 845 if (canputnext(q)) { 846 putnext(q, mp); 847 return; 848 } 849 850 rmp = (struct rpcm *)q->q_ptr; 851 ASSERT(rmp != NULL); 852 853 /* 854 * The first canputnext failed. Try again except this time with the 855 * lock held, so that we can check the state of the stream to see if 856 * it is closing. If either of these conditions evaluate to true 857 * then send the meesage. 858 */ 859 mutex_enter(&rmp->rm_lock); 860 if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) { 861 mutex_exit(&rmp->rm_lock); 862 putnext(q, mp); 863 } else { 864 /* 865 * canputnext failed again and the stream is not closing. 866 * Place the message on the queue and let the service 867 * procedure handle the message. 868 */ 869 mutex_exit(&rmp->rm_lock); 870 (void) putq(q, mp); 871 } 872 } 873 874 static void 875 rpcmodwput_other(queue_t *q, mblk_t *mp) 876 { 877 struct rpcm *rmp; 878 struct iocblk *iocp; 879 880 rmp = (struct rpcm *)q->q_ptr; 881 ASSERT(rmp != NULL); 882 883 switch (mp->b_datap->db_type) { 884 case M_IOCTL: 885 iocp = (struct iocblk *)mp->b_rptr; 886 ASSERT(iocp != NULL); 887 switch (iocp->ioc_cmd) { 888 case RPC_CLIENT: 889 case RPC_SERVER: 890 mutex_enter(&rmp->rm_lock); 891 rmp->rm_type = iocp->ioc_cmd; 892 mutex_exit(&rmp->rm_lock); 893 mp->b_datap->db_type = M_IOCACK; 894 qreply(q, mp); 895 return; 896 default: 897 /* 898 * pass the ioctl downstream and hope someone 899 * down there knows how to handle it. 900 */ 901 putnext(q, mp); 902 return; 903 } 904 default: 905 break; 906 } 907 /* 908 * This is something we definitely do not know how to handle, just 909 * pass the message downstream 910 */ 911 putnext(q, mp); 912 } 913 914 /* 915 * Module write service procedure. This is called by downstream modules 916 * for back enabling during flow control. 917 */ 918 void 919 rpcmodwsrv(queue_t *q) 920 { 921 struct rpcm *rmp; 922 mblk_t *mp = NULL; 923 924 rmp = (struct rpcm *)q->q_ptr; 925 ASSERT(rmp != NULL); 926 927 /* 928 * Get messages that may be queued and send them down stream 929 */ 930 while ((mp = getq(q)) != NULL) { 931 /* 932 * Optimize the service procedure for the server-side, by 933 * avoiding a call to canputnext(). 934 */ 935 if (rmp->rm_type == RPC_SERVER || canputnext(q)) { 936 putnext(q, mp); 937 continue; 938 } 939 (void) putbq(q, mp); 940 return; 941 } 942 } 943 944 void 945 rpcmod_hold(queue_t *q) 946 { 947 struct rpcm *rmp = (struct rpcm *)q->q_ptr; 948 949 mutex_enter(&rmp->rm_lock); 950 rmp->rm_ref++; 951 mutex_exit(&rmp->rm_lock); 952 } 953 954 void 955 rpcmod_release(queue_t *q, mblk_t *bp, bool_t enable __unused) 956 { 957 struct rpcm *rmp; 958 959 /* 960 * For now, just free the message. 961 */ 962 if (bp) 963 freemsg(bp); 964 rmp = (struct rpcm *)q->q_ptr; 965 966 mutex_enter(&rmp->rm_lock); 967 rmp->rm_ref--; 968 969 if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) { 970 cv_broadcast(&rmp->rm_cwait); 971 } 972 973 mutex_exit(&rmp->rm_lock); 974 } 975 976 /* 977 * This part of rpcmod is pushed on a connection-oriented transport for use 978 * by RPC. It serves to bypass the Stream head, implements 979 * the record marking protocol, and dispatches incoming RPC messages. 980 */ 981 982 /* Default idle timer values */ 983 #define MIR_CLNT_IDLE_TIMEOUT (5 * (60 * 1000L)) /* 5 minutes */ 984 #define MIR_SVC_IDLE_TIMEOUT (6 * (60 * 1000L)) /* 6 minutes */ 985 #define MIR_SVC_ORDREL_TIMEOUT (10 * (60 * 1000L)) /* 10 minutes */ 986 #define MIR_LASTFRAG 0x80000000 /* Record marker */ 987 988 #define MIR_SVC_QUIESCED(mir) \ 989 (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0) 990 991 #define MIR_CLEAR_INRSRV(mir_ptr) { \ 992 (mir_ptr)->mir_inrservice = 0; \ 993 if ((mir_ptr)->mir_type == RPC_SERVER && \ 994 (mir_ptr)->mir_closing) \ 995 cv_signal(&(mir_ptr)->mir_condvar); \ 996 } 997 998 /* 999 * Don't block service procedure (and mir_close) if 1000 * we are in the process of closing. 1001 */ 1002 #define MIR_WCANPUTNEXT(mir_ptr, write_q) \ 1003 (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1)) 1004 1005 static int mir_clnt_dup_request(queue_t *q, mblk_t *mp); 1006 static void mir_rput_proto(queue_t *q, mblk_t *mp); 1007 static int mir_svc_policy_notify(queue_t *q, int event); 1008 static void mir_svc_start(queue_t *wq); 1009 static void mir_svc_idle_start(queue_t *, mir_t *); 1010 static void mir_svc_idle_stop(queue_t *, mir_t *); 1011 static void mir_svc_start_close(queue_t *, mir_t *); 1012 static void mir_clnt_idle_do_stop(queue_t *); 1013 static void mir_clnt_idle_stop(queue_t *, mir_t *); 1014 static void mir_clnt_idle_start(queue_t *, mir_t *); 1015 static void mir_wput(queue_t *q, mblk_t *mp); 1016 static void mir_wput_other(queue_t *q, mblk_t *mp); 1017 static void mir_wsrv(queue_t *q); 1018 static void mir_disconnect(queue_t *, mir_t *ir); 1019 static int mir_check_len(queue_t *, mblk_t *); 1020 static void mir_timer(void *); 1021 1022 extern void (*mir_start)(queue_t *); 1023 extern void (*clnt_stop_idle)(queue_t *); 1024 1025 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT; 1026 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT; 1027 1028 /* 1029 * Timeout for subsequent notifications of idle connection. This is 1030 * typically used to clean up after a wedged orderly release. 1031 */ 1032 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */ 1033 1034 extern uint_t *clnt_max_msg_sizep; 1035 extern uint_t *svc_max_msg_sizep; 1036 uint_t clnt_max_msg_size = RPC_MAXDATASIZE; 1037 uint_t svc_max_msg_size = RPC_MAXDATASIZE; 1038 uint_t mir_krpc_cell_null; 1039 1040 static void 1041 mir_timer_stop(mir_t *mir) 1042 { 1043 timeout_id_t tid; 1044 1045 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1046 1047 /* 1048 * Since the mir_mutex lock needs to be released to call 1049 * untimeout(), we need to make sure that no other thread 1050 * can start/stop the timer (changing mir_timer_id) during 1051 * that time. The mir_timer_call bit and the mir_timer_cv 1052 * condition variable are used to synchronize this. Setting 1053 * mir_timer_call also tells mir_timer() (refer to the comments 1054 * in mir_timer()) that it does not need to do anything. 1055 */ 1056 while (mir->mir_timer_call) 1057 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); 1058 mir->mir_timer_call = B_TRUE; 1059 1060 if ((tid = mir->mir_timer_id) != 0) { 1061 mir->mir_timer_id = 0; 1062 mutex_exit(&mir->mir_mutex); 1063 (void) untimeout(tid); 1064 mutex_enter(&mir->mir_mutex); 1065 } 1066 mir->mir_timer_call = B_FALSE; 1067 cv_broadcast(&mir->mir_timer_cv); 1068 } 1069 1070 static void 1071 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl) 1072 { 1073 timeout_id_t tid; 1074 1075 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1076 1077 while (mir->mir_timer_call) 1078 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); 1079 mir->mir_timer_call = B_TRUE; 1080 1081 if ((tid = mir->mir_timer_id) != 0) { 1082 mutex_exit(&mir->mir_mutex); 1083 (void) untimeout(tid); 1084 mutex_enter(&mir->mir_mutex); 1085 } 1086 /* Only start the timer when it is not closing. */ 1087 if (!mir->mir_closing) { 1088 mir->mir_timer_id = timeout(mir_timer, q, 1089 MSEC_TO_TICK(intrvl)); 1090 } 1091 mir->mir_timer_call = B_FALSE; 1092 cv_broadcast(&mir->mir_timer_cv); 1093 } 1094 1095 static int 1096 mir_clnt_dup_request(queue_t *q, mblk_t *mp) 1097 { 1098 mblk_t *mp1; 1099 uint32_t new_xid; 1100 uint32_t old_xid; 1101 1102 ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex)); 1103 new_xid = BE32_TO_U32(&mp->b_rptr[4]); 1104 /* 1105 * This loop is a bit tacky -- it walks the STREAMS list of 1106 * flow-controlled messages. 1107 */ 1108 if ((mp1 = q->q_first) != NULL) { 1109 do { 1110 old_xid = BE32_TO_U32(&mp1->b_rptr[4]); 1111 if (new_xid == old_xid) 1112 return (1); 1113 } while ((mp1 = mp1->b_next) != NULL); 1114 } 1115 return (0); 1116 } 1117 1118 static int 1119 mir_close(queue_t *q) 1120 { 1121 mir_t *mir = q->q_ptr; 1122 mblk_t *mp; 1123 bool_t queue_cleaned = FALSE; 1124 1125 RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q); 1126 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1127 mutex_enter(&mir->mir_mutex); 1128 if ((mp = mir->mir_head_mp) != NULL) { 1129 mir->mir_head_mp = NULL; 1130 mir->mir_tail_mp = NULL; 1131 freemsg(mp); 1132 } 1133 /* 1134 * Set mir_closing so we get notified when MIR_SVC_QUIESCED() 1135 * is TRUE. And mir_timer_start() won't start the timer again. 1136 */ 1137 mir->mir_closing = B_TRUE; 1138 mir_timer_stop(mir); 1139 1140 if (mir->mir_type == RPC_SERVER) { 1141 flushq(q, FLUSHDATA); /* Ditch anything waiting on read q */ 1142 1143 /* 1144 * This will prevent more requests from arriving and 1145 * will force rpcmod to ignore flow control. 1146 */ 1147 mir_svc_start_close(WR(q), mir); 1148 1149 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) { 1150 1151 if (mir->mir_ref_cnt && !mir->mir_inrservice && 1152 (queue_cleaned == FALSE)) { 1153 /* 1154 * call into SVC to clean the queue 1155 */ 1156 mutex_exit(&mir->mir_mutex); 1157 svc_queueclean(q); 1158 queue_cleaned = TRUE; 1159 mutex_enter(&mir->mir_mutex); 1160 continue; 1161 } 1162 1163 /* 1164 * Bugid 1253810 - Force the write service 1165 * procedure to send its messages, regardless 1166 * whether the downstream module is ready 1167 * to accept data. 1168 */ 1169 if (mir->mir_inwservice == 1) 1170 qenable(WR(q)); 1171 1172 cv_wait(&mir->mir_condvar, &mir->mir_mutex); 1173 } 1174 1175 mutex_exit(&mir->mir_mutex); 1176 qprocsoff(q); 1177 1178 /* Notify kRPC that this stream is going away. */ 1179 svc_queueclose(q); 1180 } else { 1181 mutex_exit(&mir->mir_mutex); 1182 qprocsoff(q); 1183 } 1184 1185 mutex_destroy(&mir->mir_mutex); 1186 cv_destroy(&mir->mir_condvar); 1187 cv_destroy(&mir->mir_timer_cv); 1188 kmem_free(mir, sizeof (mir_t)); 1189 return (0); 1190 } 1191 1192 /* 1193 * This is server side only (RPC_SERVER). 1194 * 1195 * Exit idle mode. 1196 */ 1197 static void 1198 mir_svc_idle_stop(queue_t *q, mir_t *mir) 1199 { 1200 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1201 ASSERT((q->q_flag & QREADR) == 0); 1202 ASSERT(mir->mir_type == RPC_SERVER); 1203 RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q); 1204 1205 mir_timer_stop(mir); 1206 } 1207 1208 /* 1209 * This is server side only (RPC_SERVER). 1210 * 1211 * Start idle processing, which will include setting idle timer if the 1212 * stream is not being closed. 1213 */ 1214 static void 1215 mir_svc_idle_start(queue_t *q, mir_t *mir) 1216 { 1217 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1218 ASSERT((q->q_flag & QREADR) == 0); 1219 ASSERT(mir->mir_type == RPC_SERVER); 1220 RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q); 1221 1222 /* 1223 * Don't re-start idle timer if we are closing queues. 1224 */ 1225 if (mir->mir_closing) { 1226 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n", 1227 (void *)q); 1228 1229 /* 1230 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED() 1231 * is true. When it is true, and we are in the process of 1232 * closing the stream, signal any thread waiting in 1233 * mir_close(). 1234 */ 1235 if (mir->mir_inwservice == 0) 1236 cv_signal(&mir->mir_condvar); 1237 1238 } else { 1239 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n", 1240 mir->mir_ordrel_pending ? "ordrel" : "normal"); 1241 /* 1242 * Normal condition, start the idle timer. If an orderly 1243 * release has been sent, set the timeout to wait for the 1244 * client to close its side of the connection. Otherwise, 1245 * use the normal idle timeout. 1246 */ 1247 mir_timer_start(q, mir, mir->mir_ordrel_pending ? 1248 svc_ordrel_timeout : mir->mir_idle_timeout); 1249 } 1250 } 1251 1252 /* ARGSUSED */ 1253 static int 1254 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) 1255 { 1256 mir_t *mir; 1257 1258 RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q); 1259 /* Set variables used directly by kRPC. */ 1260 if (!mir_start) 1261 mir_start = mir_svc_start; 1262 if (!clnt_stop_idle) 1263 clnt_stop_idle = mir_clnt_idle_do_stop; 1264 if (!clnt_max_msg_sizep) 1265 clnt_max_msg_sizep = &clnt_max_msg_size; 1266 if (!svc_max_msg_sizep) 1267 svc_max_msg_sizep = &svc_max_msg_size; 1268 1269 /* Allocate a zero'ed out mir structure for this stream. */ 1270 mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP); 1271 1272 /* 1273 * We set hold inbound here so that incoming messages will 1274 * be held on the read-side queue until the stream is completely 1275 * initialized with a RPC_CLIENT or RPC_SERVER ioctl. During 1276 * the ioctl processing, the flag is cleared and any messages that 1277 * arrived between the open and the ioctl are delivered to kRPC. 1278 * 1279 * Early data should never arrive on a client stream since 1280 * servers only respond to our requests and we do not send any. 1281 * until after the stream is initialized. Early data is 1282 * very common on a server stream where the client will start 1283 * sending data as soon as the connection is made (and this 1284 * is especially true with TCP where the protocol accepts the 1285 * connection before nfsd or kRPC is notified about it). 1286 */ 1287 1288 mir->mir_hold_inbound = 1; 1289 1290 /* 1291 * Start the record marker looking for a 4-byte header. When 1292 * this length is negative, it indicates that rpcmod is looking 1293 * for bytes to consume for the record marker header. When it 1294 * is positive, it holds the number of bytes that have arrived 1295 * for the current fragment and are being held in mir_header_mp. 1296 */ 1297 1298 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 1299 1300 mir->mir_zoneid = rpc_zoneid(); 1301 mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL); 1302 cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL); 1303 cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL); 1304 1305 q->q_ptr = (char *)mir; 1306 WR(q)->q_ptr = (char *)mir; 1307 1308 /* 1309 * We noenable the read-side queue because we don't want it 1310 * automatically enabled by putq. We enable it explicitly 1311 * in mir_wsrv when appropriate. (See additional comments on 1312 * flow control at the beginning of mir_rsrv.) 1313 */ 1314 noenable(q); 1315 1316 qprocson(q); 1317 return (0); 1318 } 1319 1320 /* 1321 * Read-side put routine for both the client and server side. Does the 1322 * record marking for incoming RPC messages, and when complete, dispatches 1323 * the message to either the client or server. 1324 */ 1325 static void 1326 mir_rput(queue_t *q, mblk_t *mp) 1327 { 1328 int excess; 1329 int32_t frag_len, frag_header; 1330 mblk_t *cont_mp, *head_mp, *tail_mp, *mp1; 1331 mir_t *mir = q->q_ptr; 1332 boolean_t stop_timer = B_FALSE; 1333 1334 ASSERT(mir != NULL); 1335 1336 /* 1337 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER 1338 * with the corresponding ioctl, then don't accept 1339 * any inbound data. This should never happen for streams 1340 * created by nfsd or client-side kRPC because they are careful 1341 * to set the mode of the stream before doing anything else. 1342 */ 1343 if (mir->mir_type == 0) { 1344 freemsg(mp); 1345 return; 1346 } 1347 1348 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1349 1350 switch (mp->b_datap->db_type) { 1351 case M_DATA: 1352 break; 1353 case M_PROTO: 1354 case M_PCPROTO: 1355 if (MBLKL(mp) < sizeof (t_scalar_t)) { 1356 RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n", 1357 (int)MBLKL(mp)); 1358 freemsg(mp); 1359 return; 1360 } 1361 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) { 1362 mir_rput_proto(q, mp); 1363 return; 1364 } 1365 1366 /* Throw away the T_DATA_IND block and continue with data. */ 1367 mp1 = mp; 1368 mp = mp->b_cont; 1369 freeb(mp1); 1370 break; 1371 case M_SETOPTS: 1372 /* 1373 * If a module on the stream is trying set the Stream head's 1374 * high water mark, then set our hiwater to the requested 1375 * value. We are the "stream head" for all inbound 1376 * data messages since messages are passed directly to kRPC. 1377 */ 1378 if (MBLKL(mp) >= sizeof (struct stroptions)) { 1379 struct stroptions *stropts; 1380 1381 stropts = (struct stroptions *)mp->b_rptr; 1382 if ((stropts->so_flags & SO_HIWAT) && 1383 !(stropts->so_flags & SO_BAND)) { 1384 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat); 1385 } 1386 } 1387 putnext(q, mp); 1388 return; 1389 case M_FLUSH: 1390 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr); 1391 RPCLOG(32, "on q 0x%p\n", (void *)q); 1392 putnext(q, mp); 1393 return; 1394 default: 1395 putnext(q, mp); 1396 return; 1397 } 1398 1399 mutex_enter(&mir->mir_mutex); 1400 1401 /* 1402 * If this connection is closing, don't accept any new messages. 1403 */ 1404 if (mir->mir_svc_no_more_msgs) { 1405 ASSERT(mir->mir_type == RPC_SERVER); 1406 mutex_exit(&mir->mir_mutex); 1407 freemsg(mp); 1408 return; 1409 } 1410 1411 /* Get local copies for quicker access. */ 1412 frag_len = mir->mir_frag_len; 1413 frag_header = mir->mir_frag_header; 1414 head_mp = mir->mir_head_mp; 1415 tail_mp = mir->mir_tail_mp; 1416 1417 /* Loop, processing each message block in the mp chain separately. */ 1418 do { 1419 cont_mp = mp->b_cont; 1420 mp->b_cont = NULL; 1421 1422 /* 1423 * Drop zero-length mblks to prevent unbounded kernel memory 1424 * consumption. 1425 */ 1426 if (MBLKL(mp) == 0) { 1427 freeb(mp); 1428 continue; 1429 } 1430 1431 /* 1432 * If frag_len is negative, we're still in the process of 1433 * building frag_header -- try to complete it with this mblk. 1434 */ 1435 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) { 1436 frag_len++; 1437 frag_header <<= 8; 1438 frag_header += *mp->b_rptr++; 1439 } 1440 1441 if (MBLKL(mp) == 0 && frag_len < 0) { 1442 /* 1443 * We consumed this mblk while trying to complete the 1444 * fragment header. Free it and move on. 1445 */ 1446 freeb(mp); 1447 continue; 1448 } 1449 1450 ASSERT(frag_len >= 0); 1451 1452 /* 1453 * Now frag_header has the number of bytes in this fragment 1454 * and we're just waiting to collect them all. Chain our 1455 * latest mblk onto the list and see if we now have enough 1456 * bytes to complete the fragment. 1457 */ 1458 if (head_mp == NULL) { 1459 ASSERT(tail_mp == NULL); 1460 head_mp = tail_mp = mp; 1461 } else { 1462 tail_mp->b_cont = mp; 1463 tail_mp = mp; 1464 } 1465 1466 frag_len += MBLKL(mp); 1467 excess = frag_len - (frag_header & ~MIR_LASTFRAG); 1468 if (excess < 0) { 1469 /* 1470 * We still haven't received enough data to complete 1471 * the fragment, so continue on to the next mblk. 1472 */ 1473 continue; 1474 } 1475 1476 /* 1477 * We've got a complete fragment. If there are excess bytes, 1478 * then they're part of the next fragment's header (of either 1479 * this RPC message or the next RPC message). Split that part 1480 * into its own mblk so that we can safely freeb() it when 1481 * building frag_header above. 1482 */ 1483 if (excess > 0) { 1484 if ((mp1 = dupb(mp)) == NULL && 1485 (mp1 = copyb(mp)) == NULL) { 1486 freemsg(head_mp); 1487 freemsg(cont_mp); 1488 RPCLOG0(1, "mir_rput: dupb/copyb failed\n"); 1489 mir->mir_frag_header = 0; 1490 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 1491 mir->mir_head_mp = NULL; 1492 mir->mir_tail_mp = NULL; 1493 mir_disconnect(q, mir); /* drops mir_mutex */ 1494 return; 1495 } 1496 1497 /* 1498 * Relink the message chain so that the next mblk is 1499 * the next fragment header, followed by the rest of 1500 * the message chain. 1501 */ 1502 mp1->b_cont = cont_mp; 1503 cont_mp = mp1; 1504 1505 /* 1506 * Data in the new mblk begins at the next fragment, 1507 * and data in the old mblk ends at the next fragment. 1508 */ 1509 mp1->b_rptr = mp1->b_wptr - excess; 1510 mp->b_wptr -= excess; 1511 } 1512 1513 /* 1514 * Reset frag_len and frag_header for the next fragment. 1515 */ 1516 frag_len = -(int32_t)sizeof (uint32_t); 1517 if (!(frag_header & MIR_LASTFRAG)) { 1518 /* 1519 * The current fragment is complete, but more 1520 * fragments need to be processed before we can 1521 * pass along the RPC message headed at head_mp. 1522 */ 1523 frag_header = 0; 1524 continue; 1525 } 1526 frag_header = 0; 1527 1528 /* 1529 * We've got a complete RPC message; pass it to the 1530 * appropriate consumer. 1531 */ 1532 switch (mir->mir_type) { 1533 case RPC_CLIENT: 1534 if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) { 1535 /* 1536 * Mark this stream as active. This marker 1537 * is used in mir_timer(). 1538 */ 1539 mir->mir_clntreq = 1; 1540 mir->mir_use_timestamp = ddi_get_lbolt(); 1541 } else { 1542 freemsg(head_mp); 1543 } 1544 break; 1545 1546 case RPC_SERVER: 1547 /* 1548 * Check for flow control before passing the 1549 * message to kRPC. 1550 */ 1551 if (!mir->mir_hold_inbound) { 1552 if (mir->mir_krpc_cell) { 1553 1554 if (mir_check_len(q, head_mp)) 1555 return; 1556 1557 if (q->q_first == NULL && 1558 svc_queuereq(q, head_mp, TRUE)) { 1559 /* 1560 * If the reference count is 0 1561 * (not including this 1562 * request), then the stream is 1563 * transitioning from idle to 1564 * non-idle. In this case, we 1565 * cancel the idle timer. 1566 */ 1567 if (mir->mir_ref_cnt++ == 0) 1568 stop_timer = B_TRUE; 1569 } else { 1570 (void) putq(q, head_mp); 1571 mir->mir_inrservice = B_TRUE; 1572 } 1573 } else { 1574 /* 1575 * Count # of times this happens. Should 1576 * be never, but experience shows 1577 * otherwise. 1578 */ 1579 mir_krpc_cell_null++; 1580 freemsg(head_mp); 1581 } 1582 } else { 1583 /* 1584 * If the outbound side of the stream is 1585 * flow controlled, then hold this message 1586 * until client catches up. mir_hold_inbound 1587 * is set in mir_wput and cleared in mir_wsrv. 1588 */ 1589 (void) putq(q, head_mp); 1590 mir->mir_inrservice = B_TRUE; 1591 } 1592 break; 1593 default: 1594 RPCLOG(1, "mir_rput: unknown mir_type %d\n", 1595 mir->mir_type); 1596 freemsg(head_mp); 1597 break; 1598 } 1599 1600 /* 1601 * Reset the chain since we're starting on a new RPC message. 1602 */ 1603 head_mp = tail_mp = NULL; 1604 } while ((mp = cont_mp) != NULL); 1605 1606 /* 1607 * Sanity check the message length; if it's too large mir_check_len() 1608 * will shutdown the connection, drop mir_mutex, and return non-zero. 1609 */ 1610 if (head_mp != NULL && mir->mir_setup_complete && 1611 mir_check_len(q, head_mp)) 1612 return; 1613 1614 /* Save our local copies back in the mir structure. */ 1615 mir->mir_frag_header = frag_header; 1616 mir->mir_frag_len = frag_len; 1617 mir->mir_head_mp = head_mp; 1618 mir->mir_tail_mp = tail_mp; 1619 1620 /* 1621 * The timer is stopped after the whole message chain is processed. 1622 * The reason is that stopping the timer releases the mir_mutex 1623 * lock temporarily. This means that the request can be serviced 1624 * while we are still processing the message chain. This is not 1625 * good. So we stop the timer here instead. 1626 * 1627 * Note that if the timer fires before we stop it, it will not 1628 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer() 1629 * will just return. 1630 */ 1631 if (stop_timer) { 1632 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because " 1633 "ref cnt going to non zero\n", (void *)WR(q)); 1634 mir_svc_idle_stop(WR(q), mir); 1635 } 1636 mutex_exit(&mir->mir_mutex); 1637 } 1638 1639 static void 1640 mir_rput_proto(queue_t *q, mblk_t *mp) 1641 { 1642 mir_t *mir = (mir_t *)q->q_ptr; 1643 uint32_t type; 1644 uint32_t reason = 0; 1645 1646 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1647 1648 type = ((union T_primitives *)mp->b_rptr)->type; 1649 switch (mir->mir_type) { 1650 case RPC_CLIENT: 1651 switch (type) { 1652 case T_DISCON_IND: 1653 reason = ((struct T_discon_ind *) 1654 (mp->b_rptr))->DISCON_reason; 1655 /*FALLTHROUGH*/ 1656 case T_ORDREL_IND: 1657 mutex_enter(&mir->mir_mutex); 1658 if (mir->mir_head_mp) { 1659 freemsg(mir->mir_head_mp); 1660 mir->mir_head_mp = (mblk_t *)0; 1661 mir->mir_tail_mp = (mblk_t *)0; 1662 } 1663 /* 1664 * We are disconnecting, but not necessarily 1665 * closing. By not closing, we will fail to 1666 * pick up a possibly changed global timeout value, 1667 * unless we store it now. 1668 */ 1669 mir->mir_idle_timeout = clnt_idle_timeout; 1670 mir_clnt_idle_stop(WR(q), mir); 1671 1672 /* 1673 * Even though we are unconnected, we still 1674 * leave the idle timer going on the client. The 1675 * reason for is that if we've disconnected due 1676 * to a server-side disconnect, reset, or connection 1677 * timeout, there is a possibility the client may 1678 * retry the RPC request. This retry needs to done on 1679 * the same bound address for the server to interpret 1680 * it as such. However, we don't want 1681 * to wait forever for that possibility. If the 1682 * end-point stays unconnected for mir_idle_timeout 1683 * units of time, then that is a signal to the 1684 * connection manager to give up waiting for the 1685 * application (eg. NFS) to send a retry. 1686 */ 1687 mir_clnt_idle_start(WR(q), mir); 1688 mutex_exit(&mir->mir_mutex); 1689 clnt_dispatch_notifyall(WR(q), type, reason); 1690 freemsg(mp); 1691 return; 1692 case T_ERROR_ACK: 1693 { 1694 struct T_error_ack *terror; 1695 1696 terror = (struct T_error_ack *)mp->b_rptr; 1697 RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p", 1698 (void *)q); 1699 RPCLOG(1, " ERROR_prim: %s,", 1700 rpc_tpiprim2name(terror->ERROR_prim)); 1701 RPCLOG(1, " TLI_error: %s,", 1702 rpc_tpierr2name(terror->TLI_error)); 1703 RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error); 1704 if (terror->ERROR_prim == T_DISCON_REQ) { 1705 clnt_dispatch_notifyall(WR(q), type, reason); 1706 freemsg(mp); 1707 return; 1708 } else { 1709 if (clnt_dispatch_notifyconn(WR(q), mp)) 1710 return; 1711 } 1712 break; 1713 } 1714 case T_OK_ACK: 1715 { 1716 struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr; 1717 1718 if (tok->CORRECT_prim == T_DISCON_REQ) { 1719 clnt_dispatch_notifyall(WR(q), type, reason); 1720 freemsg(mp); 1721 return; 1722 } else { 1723 if (clnt_dispatch_notifyconn(WR(q), mp)) 1724 return; 1725 } 1726 break; 1727 } 1728 case T_CONN_CON: 1729 case T_INFO_ACK: 1730 case T_OPTMGMT_ACK: 1731 if (clnt_dispatch_notifyconn(WR(q), mp)) 1732 return; 1733 break; 1734 case T_BIND_ACK: 1735 break; 1736 default: 1737 RPCLOG(1, "mir_rput: unexpected message %d " 1738 "for kRPC client\n", 1739 ((union T_primitives *)mp->b_rptr)->type); 1740 break; 1741 } 1742 break; 1743 1744 case RPC_SERVER: 1745 switch (type) { 1746 case T_BIND_ACK: 1747 { 1748 struct T_bind_ack *tbind; 1749 1750 /* 1751 * If this is a listening stream, then shut 1752 * off the idle timer. 1753 */ 1754 tbind = (struct T_bind_ack *)mp->b_rptr; 1755 if (tbind->CONIND_number > 0) { 1756 mutex_enter(&mir->mir_mutex); 1757 mir_svc_idle_stop(WR(q), mir); 1758 1759 /* 1760 * mark this as a listen endpoint 1761 * for special handling. 1762 */ 1763 1764 mir->mir_listen_stream = 1; 1765 mutex_exit(&mir->mir_mutex); 1766 } 1767 break; 1768 } 1769 case T_DISCON_IND: 1770 case T_ORDREL_IND: 1771 RPCLOG(16, "mir_rput_proto: got %s indication\n", 1772 type == T_DISCON_IND ? "disconnect" 1773 : "orderly release"); 1774 1775 /* 1776 * For listen endpoint just pass 1777 * on the message. 1778 */ 1779 1780 if (mir->mir_listen_stream) 1781 break; 1782 1783 mutex_enter(&mir->mir_mutex); 1784 1785 /* 1786 * If client wants to break off connection, record 1787 * that fact. 1788 */ 1789 mir_svc_start_close(WR(q), mir); 1790 1791 /* 1792 * If we are idle, then send the orderly release 1793 * or disconnect indication to nfsd. 1794 */ 1795 if (MIR_SVC_QUIESCED(mir)) { 1796 mutex_exit(&mir->mir_mutex); 1797 break; 1798 } 1799 1800 RPCLOG(16, "mir_rput_proto: not idle, so " 1801 "disconnect/ord rel indication not passed " 1802 "upstream on 0x%p\n", (void *)q); 1803 1804 /* 1805 * Hold the indication until we get idle 1806 * If there already is an indication stored, 1807 * replace it if the new one is a disconnect. The 1808 * reasoning is that disconnection takes less time 1809 * to process, and once a client decides to 1810 * disconnect, we should do that. 1811 */ 1812 if (mir->mir_svc_pend_mp) { 1813 if (type == T_DISCON_IND) { 1814 RPCLOG(16, "mir_rput_proto: replacing" 1815 " held disconnect/ord rel" 1816 " indication with disconnect on" 1817 " 0x%p\n", (void *)q); 1818 1819 freemsg(mir->mir_svc_pend_mp); 1820 mir->mir_svc_pend_mp = mp; 1821 } else { 1822 RPCLOG(16, "mir_rput_proto: already " 1823 "held a disconnect/ord rel " 1824 "indication. freeing ord rel " 1825 "ind on 0x%p\n", (void *)q); 1826 freemsg(mp); 1827 } 1828 } else 1829 mir->mir_svc_pend_mp = mp; 1830 1831 mutex_exit(&mir->mir_mutex); 1832 return; 1833 1834 default: 1835 /* nfsd handles server-side non-data messages. */ 1836 break; 1837 } 1838 break; 1839 1840 default: 1841 break; 1842 } 1843 1844 putnext(q, mp); 1845 } 1846 1847 /* 1848 * The server-side read queues are used to hold inbound messages while 1849 * outbound flow control is exerted. When outbound flow control is 1850 * relieved, mir_wsrv qenables the read-side queue. Read-side queues 1851 * are not enabled by STREAMS and are explicitly noenable'ed in mir_open. 1852 */ 1853 static void 1854 mir_rsrv(queue_t *q) 1855 { 1856 mir_t *mir; 1857 mblk_t *mp; 1858 boolean_t stop_timer = B_FALSE; 1859 1860 mir = (mir_t *)q->q_ptr; 1861 mutex_enter(&mir->mir_mutex); 1862 1863 mp = NULL; 1864 switch (mir->mir_type) { 1865 case RPC_SERVER: 1866 if (mir->mir_ref_cnt == 0) 1867 mir->mir_hold_inbound = 0; 1868 if (mir->mir_hold_inbound) 1869 break; 1870 1871 while (mp = getq(q)) { 1872 if (mir->mir_krpc_cell && 1873 (mir->mir_svc_no_more_msgs == 0)) { 1874 1875 if (mir_check_len(q, mp)) 1876 return; 1877 1878 if (svc_queuereq(q, mp, TRUE)) { 1879 /* 1880 * If we were idle, turn off idle timer 1881 * since we aren't idle any more. 1882 */ 1883 if (mir->mir_ref_cnt++ == 0) 1884 stop_timer = B_TRUE; 1885 } else { 1886 (void) putbq(q, mp); 1887 break; 1888 } 1889 } else { 1890 /* 1891 * Count # of times this happens. Should be 1892 * never, but experience shows otherwise. 1893 */ 1894 if (mir->mir_krpc_cell == NULL) 1895 mir_krpc_cell_null++; 1896 freemsg(mp); 1897 } 1898 } 1899 break; 1900 case RPC_CLIENT: 1901 break; 1902 default: 1903 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type); 1904 1905 if (q->q_first == NULL) 1906 MIR_CLEAR_INRSRV(mir); 1907 1908 mutex_exit(&mir->mir_mutex); 1909 1910 return; 1911 } 1912 1913 /* 1914 * The timer is stopped after all the messages are processed. 1915 * The reason is that stopping the timer releases the mir_mutex 1916 * lock temporarily. This means that the request can be serviced 1917 * while we are still processing the message queue. This is not 1918 * good. So we stop the timer here instead. 1919 */ 1920 if (stop_timer) { 1921 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref " 1922 "cnt going to non zero\n", (void *)WR(q)); 1923 mir_svc_idle_stop(WR(q), mir); 1924 } 1925 1926 if (q->q_first == NULL) { 1927 mblk_t *cmp = NULL; 1928 1929 MIR_CLEAR_INRSRV(mir); 1930 1931 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) { 1932 cmp = mir->mir_svc_pend_mp; 1933 mir->mir_svc_pend_mp = NULL; 1934 } 1935 1936 mutex_exit(&mir->mir_mutex); 1937 1938 if (cmp != NULL) { 1939 RPCLOG(16, "mir_rsrv: line %d: sending a held " 1940 "disconnect/ord rel indication upstream\n", 1941 __LINE__); 1942 putnext(q, cmp); 1943 } 1944 1945 return; 1946 } 1947 mutex_exit(&mir->mir_mutex); 1948 } 1949 1950 static int mir_svc_policy_fails; 1951 1952 /* 1953 * Called to send an event code to nfsd/lockd so that it initiates 1954 * connection close. 1955 */ 1956 static int 1957 mir_svc_policy_notify(queue_t *q, int event) 1958 { 1959 mblk_t *mp; 1960 #ifdef DEBUG 1961 mir_t *mir = (mir_t *)q->q_ptr; 1962 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 1963 #endif 1964 ASSERT(q->q_flag & QREADR); 1965 1966 /* 1967 * Create an M_DATA message with the event code and pass it to the 1968 * Stream head (nfsd or whoever created the stream will consume it). 1969 */ 1970 mp = allocb(sizeof (int), BPRI_HI); 1971 1972 if (!mp) { 1973 1974 mir_svc_policy_fails++; 1975 RPCLOG(16, "mir_svc_policy_notify: could not allocate event " 1976 "%d\n", event); 1977 return (ENOMEM); 1978 } 1979 1980 U32_TO_BE32(event, mp->b_rptr); 1981 mp->b_wptr = mp->b_rptr + sizeof (int); 1982 putnext(q, mp); 1983 return (0); 1984 } 1985 1986 /* 1987 * Server side: start the close phase. We want to get this rpcmod slot in an 1988 * idle state before mir_close() is called. 1989 */ 1990 static void 1991 mir_svc_start_close(queue_t *wq, mir_t *mir) 1992 { 1993 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 1994 ASSERT((wq->q_flag & QREADR) == 0); 1995 ASSERT(mir->mir_type == RPC_SERVER); 1996 1997 /* 1998 * Do not accept any more messages. 1999 */ 2000 mir->mir_svc_no_more_msgs = 1; 2001 2002 /* 2003 * Next two statements will make the read service procedure 2004 * free everything stuck in the streams read queue. 2005 * It's not necessary because enabling the write queue will 2006 * have the same effect, but why not speed the process along? 2007 */ 2008 mir->mir_hold_inbound = 0; 2009 qenable(RD(wq)); 2010 2011 /* 2012 * Meanwhile force the write service procedure to send the 2013 * responses downstream, regardless of flow control. 2014 */ 2015 qenable(wq); 2016 } 2017 2018 void 2019 mir_svc_hold(queue_t *wq) 2020 { 2021 mir_t *mir = (mir_t *)wq->q_ptr; 2022 2023 mutex_enter(&mir->mir_mutex); 2024 mir->mir_ref_cnt++; 2025 mutex_exit(&mir->mir_mutex); 2026 } 2027 2028 /* 2029 * This routine is called directly by kRPC after a request is completed, 2030 * whether a reply was sent or the request was dropped. 2031 */ 2032 void 2033 mir_svc_release(queue_t *wq, mblk_t *mp, bool_t enable) 2034 { 2035 mir_t *mir = (mir_t *)wq->q_ptr; 2036 mblk_t *cmp = NULL; 2037 2038 ASSERT((wq->q_flag & QREADR) == 0); 2039 if (mp) 2040 freemsg(mp); 2041 2042 if (enable) 2043 qenable(RD(wq)); 2044 2045 mutex_enter(&mir->mir_mutex); 2046 2047 /* 2048 * Start idle processing if this is the last reference. 2049 */ 2050 if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) { 2051 cmp = mir->mir_svc_pend_mp; 2052 mir->mir_svc_pend_mp = NULL; 2053 } 2054 2055 if (cmp) { 2056 RPCLOG(16, "mir_svc_release: sending a held " 2057 "disconnect/ord rel indication upstream on queue 0x%p\n", 2058 (void *)RD(wq)); 2059 2060 mutex_exit(&mir->mir_mutex); 2061 2062 putnext(RD(wq), cmp); 2063 2064 mutex_enter(&mir->mir_mutex); 2065 } 2066 2067 /* 2068 * Start idle processing if this is the last reference. 2069 */ 2070 if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) { 2071 2072 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p " 2073 "because ref cnt is zero\n", (void *) wq); 2074 2075 mir_svc_idle_start(wq, mir); 2076 } 2077 2078 mir->mir_ref_cnt--; 2079 ASSERT(mir->mir_ref_cnt >= 0); 2080 2081 /* 2082 * Wake up the thread waiting to close. 2083 */ 2084 2085 if ((mir->mir_ref_cnt == 0) && mir->mir_closing) 2086 cv_signal(&mir->mir_condvar); 2087 2088 mutex_exit(&mir->mir_mutex); 2089 } 2090 2091 /* 2092 * This routine is called by server-side kRPC when it is ready to 2093 * handle inbound messages on the stream. 2094 */ 2095 static void 2096 mir_svc_start(queue_t *wq) 2097 { 2098 mir_t *mir = (mir_t *)wq->q_ptr; 2099 2100 /* 2101 * no longer need to take the mir_mutex because the 2102 * mir_setup_complete field has been moved out of 2103 * the binary field protected by the mir_mutex. 2104 */ 2105 2106 mir->mir_setup_complete = 1; 2107 qenable(RD(wq)); 2108 } 2109 2110 /* 2111 * client side wrapper for stopping timer with normal idle timeout. 2112 */ 2113 static void 2114 mir_clnt_idle_stop(queue_t *wq, mir_t *mir) 2115 { 2116 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2117 ASSERT((wq->q_flag & QREADR) == 0); 2118 ASSERT(mir->mir_type == RPC_CLIENT); 2119 2120 mir_timer_stop(mir); 2121 } 2122 2123 /* 2124 * client side wrapper for stopping timer with normal idle timeout. 2125 */ 2126 static void 2127 mir_clnt_idle_start(queue_t *wq, mir_t *mir) 2128 { 2129 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2130 ASSERT((wq->q_flag & QREADR) == 0); 2131 ASSERT(mir->mir_type == RPC_CLIENT); 2132 2133 mir_timer_start(wq, mir, mir->mir_idle_timeout); 2134 } 2135 2136 /* 2137 * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on 2138 * end-points that aren't connected. 2139 */ 2140 static void 2141 mir_clnt_idle_do_stop(queue_t *wq) 2142 { 2143 mir_t *mir = (mir_t *)wq->q_ptr; 2144 2145 RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq); 2146 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2147 mutex_enter(&mir->mir_mutex); 2148 mir_clnt_idle_stop(wq, mir); 2149 mutex_exit(&mir->mir_mutex); 2150 } 2151 2152 /* 2153 * Timer handler. It handles idle timeout and memory shortage problem. 2154 */ 2155 static void 2156 mir_timer(void *arg) 2157 { 2158 queue_t *wq = (queue_t *)arg; 2159 mir_t *mir = (mir_t *)wq->q_ptr; 2160 boolean_t notify; 2161 clock_t now; 2162 2163 mutex_enter(&mir->mir_mutex); 2164 2165 /* 2166 * mir_timer_call is set only when either mir_timer_[start|stop] 2167 * is progressing. And mir_timer() can only be run while they 2168 * are progressing if the timer is being stopped. So just 2169 * return. 2170 */ 2171 if (mir->mir_timer_call) { 2172 mutex_exit(&mir->mir_mutex); 2173 return; 2174 } 2175 mir->mir_timer_id = 0; 2176 2177 switch (mir->mir_type) { 2178 case RPC_CLIENT: 2179 2180 /* 2181 * For clients, the timer fires at clnt_idle_timeout 2182 * intervals. If the activity marker (mir_clntreq) is 2183 * zero, then the stream has been idle since the last 2184 * timer event and we notify kRPC. If mir_clntreq is 2185 * non-zero, then the stream is active and we just 2186 * restart the timer for another interval. mir_clntreq 2187 * is set to 1 in mir_wput for every request passed 2188 * downstream. 2189 * 2190 * If this was a memory shortage timer reset the idle 2191 * timeout regardless; the mir_clntreq will not be a 2192 * valid indicator. 2193 * 2194 * The timer is initially started in mir_wput during 2195 * RPC_CLIENT ioctl processing. 2196 * 2197 * The timer interval can be changed for individual 2198 * streams with the ND variable "mir_idle_timeout". 2199 */ 2200 now = ddi_get_lbolt(); 2201 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp + 2202 MSEC_TO_TICK(mir->mir_idle_timeout) - now >= 0) { 2203 clock_t tout; 2204 2205 tout = mir->mir_idle_timeout - 2206 TICK_TO_MSEC(now - mir->mir_use_timestamp); 2207 if (tout < 0) 2208 tout = 1000; 2209 #if 0 2210 printf("mir_timer[%d < %d + %d]: reset client timer " 2211 "to %d (ms)\n", TICK_TO_MSEC(now), 2212 TICK_TO_MSEC(mir->mir_use_timestamp), 2213 mir->mir_idle_timeout, tout); 2214 #endif 2215 mir->mir_clntreq = 0; 2216 mir_timer_start(wq, mir, tout); 2217 mutex_exit(&mir->mir_mutex); 2218 return; 2219 } 2220 #if 0 2221 printf("mir_timer[%d]: doing client timeout\n", now / hz); 2222 #endif 2223 /* 2224 * We are disconnecting, but not necessarily 2225 * closing. By not closing, we will fail to 2226 * pick up a possibly changed global timeout value, 2227 * unless we store it now. 2228 */ 2229 mir->mir_idle_timeout = clnt_idle_timeout; 2230 mir_clnt_idle_start(wq, mir); 2231 2232 mutex_exit(&mir->mir_mutex); 2233 /* 2234 * We pass T_ORDREL_REQ as an integer value 2235 * to kRPC as the indication that the stream 2236 * is idle. This is not a T_ORDREL_REQ message, 2237 * it is just a convenient value since we call 2238 * the same kRPC routine for T_ORDREL_INDs and 2239 * T_DISCON_INDs. 2240 */ 2241 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0); 2242 return; 2243 2244 case RPC_SERVER: 2245 2246 /* 2247 * For servers, the timer is only running when the stream 2248 * is really idle or memory is short. The timer is started 2249 * by mir_wput when mir_type is set to RPC_SERVER and 2250 * by mir_svc_idle_start whenever the stream goes idle 2251 * (mir_ref_cnt == 0). The timer is cancelled in 2252 * mir_rput whenever a new inbound request is passed to kRPC 2253 * and the stream was previously idle. 2254 * 2255 * The timer interval can be changed for individual 2256 * streams with the ND variable "mir_idle_timeout". 2257 * 2258 * If the stream is not idle do nothing. 2259 */ 2260 if (!MIR_SVC_QUIESCED(mir)) { 2261 mutex_exit(&mir->mir_mutex); 2262 return; 2263 } 2264 2265 notify = !mir->mir_inrservice; 2266 mutex_exit(&mir->mir_mutex); 2267 2268 /* 2269 * If there is no packet queued up in read queue, the stream 2270 * is really idle so notify nfsd to close it. 2271 */ 2272 if (notify) { 2273 RPCLOG(16, "mir_timer: telling stream head listener " 2274 "to close stream (0x%p)\n", (void *) RD(wq)); 2275 (void) mir_svc_policy_notify(RD(wq), 1); 2276 } 2277 return; 2278 default: 2279 RPCLOG(1, "mir_timer: unexpected mir_type %d\n", 2280 mir->mir_type); 2281 mutex_exit(&mir->mir_mutex); 2282 return; 2283 } 2284 } 2285 2286 /* 2287 * Called by the RPC package to send either a call or a return, or a 2288 * transport connection request. Adds the record marking header. 2289 */ 2290 static void 2291 mir_wput(queue_t *q, mblk_t *mp) 2292 { 2293 uint_t frag_header; 2294 mir_t *mir = (mir_t *)q->q_ptr; 2295 uchar_t *rptr = mp->b_rptr; 2296 2297 if (!mir) { 2298 freemsg(mp); 2299 return; 2300 } 2301 2302 if (mp->b_datap->db_type != M_DATA) { 2303 mir_wput_other(q, mp); 2304 return; 2305 } 2306 2307 if (mir->mir_ordrel_pending == 1) { 2308 freemsg(mp); 2309 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n", 2310 (void *)q); 2311 return; 2312 } 2313 2314 frag_header = (uint_t)DLEN(mp); 2315 frag_header |= MIR_LASTFRAG; 2316 2317 /* Stick in the 4 byte record marking header. */ 2318 if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) || 2319 !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) { 2320 /* 2321 * Since we know that M_DATA messages are created exclusively 2322 * by kRPC, we expect that kRPC will leave room for our header 2323 * and 4 byte align which is normal for XDR. 2324 * If kRPC (or someone else) does not cooperate, then we 2325 * just throw away the message. 2326 */ 2327 RPCLOG(1, "mir_wput: kRPC did not leave space for record " 2328 "fragment header (%d bytes left)\n", 2329 (int)(rptr - mp->b_datap->db_base)); 2330 freemsg(mp); 2331 return; 2332 } 2333 rptr -= sizeof (uint32_t); 2334 *(uint32_t *)rptr = htonl(frag_header); 2335 mp->b_rptr = rptr; 2336 2337 mutex_enter(&mir->mir_mutex); 2338 if (mir->mir_type == RPC_CLIENT) { 2339 /* 2340 * For the client, set mir_clntreq to indicate that the 2341 * connection is active. 2342 */ 2343 mir->mir_clntreq = 1; 2344 mir->mir_use_timestamp = ddi_get_lbolt(); 2345 } 2346 2347 /* 2348 * If we haven't already queued some data and the downstream module 2349 * can accept more data, send it on, otherwise we queue the message 2350 * and take other actions depending on mir_type. 2351 */ 2352 if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) { 2353 mutex_exit(&mir->mir_mutex); 2354 2355 /* 2356 * Now we pass the RPC message downstream. 2357 */ 2358 putnext(q, mp); 2359 return; 2360 } 2361 2362 switch (mir->mir_type) { 2363 case RPC_CLIENT: 2364 /* 2365 * Check for a previous duplicate request on the 2366 * queue. If there is one, then we throw away 2367 * the current message and let the previous one 2368 * go through. If we can't find a duplicate, then 2369 * send this one. This tap dance is an effort 2370 * to reduce traffic and processing requirements 2371 * under load conditions. 2372 */ 2373 if (mir_clnt_dup_request(q, mp)) { 2374 mutex_exit(&mir->mir_mutex); 2375 freemsg(mp); 2376 return; 2377 } 2378 break; 2379 case RPC_SERVER: 2380 /* 2381 * Set mir_hold_inbound so that new inbound RPC 2382 * messages will be held until the client catches 2383 * up on the earlier replies. This flag is cleared 2384 * in mir_wsrv after flow control is relieved; 2385 * the read-side queue is also enabled at that time. 2386 */ 2387 mir->mir_hold_inbound = 1; 2388 break; 2389 default: 2390 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type); 2391 break; 2392 } 2393 mir->mir_inwservice = 1; 2394 (void) putq(q, mp); 2395 mutex_exit(&mir->mir_mutex); 2396 } 2397 2398 static void 2399 mir_wput_other(queue_t *q, mblk_t *mp) 2400 { 2401 mir_t *mir = (mir_t *)q->q_ptr; 2402 struct iocblk *iocp; 2403 uchar_t *rptr = mp->b_rptr; 2404 bool_t flush_in_svc = FALSE; 2405 2406 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); 2407 switch (mp->b_datap->db_type) { 2408 case M_IOCTL: 2409 iocp = (struct iocblk *)rptr; 2410 switch (iocp->ioc_cmd) { 2411 case RPC_CLIENT: 2412 mutex_enter(&mir->mir_mutex); 2413 if (mir->mir_type != 0 && 2414 mir->mir_type != iocp->ioc_cmd) { 2415 ioc_eperm: 2416 mutex_exit(&mir->mir_mutex); 2417 iocp->ioc_error = EPERM; 2418 iocp->ioc_count = 0; 2419 mp->b_datap->db_type = M_IOCACK; 2420 qreply(q, mp); 2421 return; 2422 } 2423 2424 mir->mir_type = iocp->ioc_cmd; 2425 2426 /* 2427 * Clear mir_hold_inbound which was set to 1 by 2428 * mir_open. This flag is not used on client 2429 * streams. 2430 */ 2431 mir->mir_hold_inbound = 0; 2432 mir->mir_max_msg_sizep = &clnt_max_msg_size; 2433 2434 /* 2435 * Start the idle timer. See mir_timer() for more 2436 * information on how client timers work. 2437 */ 2438 mir->mir_idle_timeout = clnt_idle_timeout; 2439 mir_clnt_idle_start(q, mir); 2440 mutex_exit(&mir->mir_mutex); 2441 2442 mp->b_datap->db_type = M_IOCACK; 2443 qreply(q, mp); 2444 return; 2445 case RPC_SERVER: 2446 mutex_enter(&mir->mir_mutex); 2447 if (mir->mir_type != 0 && 2448 mir->mir_type != iocp->ioc_cmd) 2449 goto ioc_eperm; 2450 2451 /* 2452 * We don't clear mir_hold_inbound here because 2453 * mir_hold_inbound is used in the flow control 2454 * model. If we cleared it here, then we'd commit 2455 * a small violation to the model where the transport 2456 * might immediately block downstream flow. 2457 */ 2458 2459 mir->mir_type = iocp->ioc_cmd; 2460 mir->mir_max_msg_sizep = &svc_max_msg_size; 2461 2462 /* 2463 * Start the idle timer. See mir_timer() for more 2464 * information on how server timers work. 2465 * 2466 * Note that it is important to start the idle timer 2467 * here so that connections time out even if we 2468 * never receive any data on them. 2469 */ 2470 mir->mir_idle_timeout = svc_idle_timeout; 2471 RPCLOG(16, "mir_wput_other starting idle timer on 0x%p " 2472 "because we got RPC_SERVER ioctl\n", (void *)q); 2473 mir_svc_idle_start(q, mir); 2474 mutex_exit(&mir->mir_mutex); 2475 2476 mp->b_datap->db_type = M_IOCACK; 2477 qreply(q, mp); 2478 return; 2479 default: 2480 break; 2481 } 2482 break; 2483 2484 case M_PROTO: 2485 if (mir->mir_type == RPC_CLIENT) { 2486 /* 2487 * We are likely being called from the context of a 2488 * service procedure. So we need to enqueue. However 2489 * enqueing may put our message behind data messages. 2490 * So flush the data first. 2491 */ 2492 flush_in_svc = TRUE; 2493 } 2494 if ((mp->b_wptr - rptr) < sizeof (uint32_t) || 2495 !IS_P2ALIGNED(rptr, sizeof (uint32_t))) 2496 break; 2497 2498 switch (((union T_primitives *)rptr)->type) { 2499 case T_DATA_REQ: 2500 /* Don't pass T_DATA_REQ messages downstream. */ 2501 freemsg(mp); 2502 return; 2503 case T_ORDREL_REQ: 2504 RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n", 2505 (void *)q); 2506 mutex_enter(&mir->mir_mutex); 2507 if (mir->mir_type != RPC_SERVER) { 2508 /* 2509 * We are likely being called from 2510 * clnt_dispatch_notifyall(). Sending 2511 * a T_ORDREL_REQ will result in 2512 * a some kind of _IND message being sent, 2513 * will be another call to 2514 * clnt_dispatch_notifyall(). To keep the stack 2515 * lean, queue this message. 2516 */ 2517 mir->mir_inwservice = 1; 2518 (void) putq(q, mp); 2519 mutex_exit(&mir->mir_mutex); 2520 return; 2521 } 2522 2523 /* 2524 * Mark the structure such that we don't accept any 2525 * more requests from client. We could defer this 2526 * until we actually send the orderly release 2527 * request downstream, but all that does is delay 2528 * the closing of this stream. 2529 */ 2530 RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ " 2531 " so calling mir_svc_start_close\n", (void *)q); 2532 2533 mir_svc_start_close(q, mir); 2534 2535 /* 2536 * If we have sent down a T_ORDREL_REQ, don't send 2537 * any more. 2538 */ 2539 if (mir->mir_ordrel_pending) { 2540 freemsg(mp); 2541 mutex_exit(&mir->mir_mutex); 2542 return; 2543 } 2544 2545 /* 2546 * If the stream is not idle, then we hold the 2547 * orderly release until it becomes idle. This 2548 * ensures that kRPC will be able to reply to 2549 * all requests that we have passed to it. 2550 * 2551 * We also queue the request if there is data already 2552 * queued, because we cannot allow the T_ORDREL_REQ 2553 * to go before data. When we had a separate reply 2554 * count, this was not a problem, because the 2555 * reply count was reconciled when mir_wsrv() 2556 * completed. 2557 */ 2558 if (!MIR_SVC_QUIESCED(mir) || 2559 mir->mir_inwservice == 1) { 2560 mir->mir_inwservice = 1; 2561 (void) putq(q, mp); 2562 2563 RPCLOG(16, "mir_wput_other: queuing " 2564 "T_ORDREL_REQ on 0x%p\n", (void *)q); 2565 2566 mutex_exit(&mir->mir_mutex); 2567 return; 2568 } 2569 2570 /* 2571 * Mark the structure so that we know we sent 2572 * an orderly release request, and reset the idle timer. 2573 */ 2574 mir->mir_ordrel_pending = 1; 2575 2576 RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start" 2577 " on 0x%p because we got T_ORDREL_REQ\n", 2578 (void *)q); 2579 2580 mir_svc_idle_start(q, mir); 2581 mutex_exit(&mir->mir_mutex); 2582 2583 /* 2584 * When we break, we will putnext the T_ORDREL_REQ. 2585 */ 2586 break; 2587 2588 case T_CONN_REQ: 2589 mutex_enter(&mir->mir_mutex); 2590 if (mir->mir_head_mp != NULL) { 2591 freemsg(mir->mir_head_mp); 2592 mir->mir_head_mp = NULL; 2593 mir->mir_tail_mp = NULL; 2594 } 2595 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 2596 /* 2597 * Restart timer in case mir_clnt_idle_do_stop() was 2598 * called. 2599 */ 2600 mir->mir_idle_timeout = clnt_idle_timeout; 2601 mir_clnt_idle_stop(q, mir); 2602 mir_clnt_idle_start(q, mir); 2603 mutex_exit(&mir->mir_mutex); 2604 break; 2605 2606 default: 2607 /* 2608 * T_DISCON_REQ is one of the interesting default 2609 * cases here. Ideally, an M_FLUSH is done before 2610 * T_DISCON_REQ is done. However, that is somewhat 2611 * cumbersome for clnt_cots.c to do. So we queue 2612 * T_DISCON_REQ, and let the service procedure 2613 * flush all M_DATA. 2614 */ 2615 break; 2616 } 2617 /* FALLTHROUGH */ 2618 default: 2619 if (mp->b_datap->db_type >= QPCTL) { 2620 if (mp->b_datap->db_type == M_FLUSH) { 2621 if (mir->mir_type == RPC_CLIENT && 2622 *mp->b_rptr & FLUSHW) { 2623 RPCLOG(32, "mir_wput_other: flushing " 2624 "wq 0x%p\n", (void *)q); 2625 if (*mp->b_rptr & FLUSHBAND) { 2626 flushband(q, *(mp->b_rptr + 1), 2627 FLUSHDATA); 2628 } else { 2629 flushq(q, FLUSHDATA); 2630 } 2631 } else { 2632 RPCLOG(32, "mir_wput_other: ignoring " 2633 "M_FLUSH on wq 0x%p\n", (void *)q); 2634 } 2635 } 2636 break; 2637 } 2638 2639 mutex_enter(&mir->mir_mutex); 2640 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) { 2641 mutex_exit(&mir->mir_mutex); 2642 break; 2643 } 2644 mir->mir_inwservice = 1; 2645 mir->mir_inwflushdata = flush_in_svc; 2646 (void) putq(q, mp); 2647 mutex_exit(&mir->mir_mutex); 2648 qenable(q); 2649 2650 return; 2651 } 2652 putnext(q, mp); 2653 } 2654 2655 static void 2656 mir_wsrv(queue_t *q) 2657 { 2658 mblk_t *mp; 2659 mir_t *mir; 2660 bool_t flushdata; 2661 2662 mir = (mir_t *)q->q_ptr; 2663 mutex_enter(&mir->mir_mutex); 2664 2665 flushdata = mir->mir_inwflushdata; 2666 mir->mir_inwflushdata = 0; 2667 2668 while (mp = getq(q)) { 2669 if (mp->b_datap->db_type == M_DATA) { 2670 /* 2671 * Do not send any more data if we have sent 2672 * a T_ORDREL_REQ. 2673 */ 2674 if (flushdata || mir->mir_ordrel_pending == 1) { 2675 freemsg(mp); 2676 continue; 2677 } 2678 2679 /* 2680 * Make sure that the stream can really handle more 2681 * data. 2682 */ 2683 if (!MIR_WCANPUTNEXT(mir, q)) { 2684 (void) putbq(q, mp); 2685 mutex_exit(&mir->mir_mutex); 2686 return; 2687 } 2688 2689 /* 2690 * Now we pass the RPC message downstream. 2691 */ 2692 mutex_exit(&mir->mir_mutex); 2693 putnext(q, mp); 2694 mutex_enter(&mir->mir_mutex); 2695 continue; 2696 } 2697 2698 /* 2699 * This is not an RPC message, pass it downstream 2700 * (ignoring flow control) if the server side is not sending a 2701 * T_ORDREL_REQ downstream. 2702 */ 2703 if (mir->mir_type != RPC_SERVER || 2704 ((union T_primitives *)mp->b_rptr)->type != 2705 T_ORDREL_REQ) { 2706 mutex_exit(&mir->mir_mutex); 2707 putnext(q, mp); 2708 mutex_enter(&mir->mir_mutex); 2709 continue; 2710 } 2711 2712 if (mir->mir_ordrel_pending == 1) { 2713 /* 2714 * Don't send two T_ORDRELs 2715 */ 2716 freemsg(mp); 2717 continue; 2718 } 2719 2720 /* 2721 * Mark the structure so that we know we sent an orderly 2722 * release request. We will check to see slot is idle at the 2723 * end of this routine, and if so, reset the idle timer to 2724 * handle orderly release timeouts. 2725 */ 2726 mir->mir_ordrel_pending = 1; 2727 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n", 2728 (void *)q); 2729 /* 2730 * Send the orderly release downstream. If there are other 2731 * pending replies we won't be able to send them. However, 2732 * the only reason we should send the orderly release is if 2733 * we were idle, or if an unusual event occurred. 2734 */ 2735 mutex_exit(&mir->mir_mutex); 2736 putnext(q, mp); 2737 mutex_enter(&mir->mir_mutex); 2738 } 2739 2740 if (q->q_first == NULL) 2741 /* 2742 * If we call mir_svc_idle_start() below, then 2743 * clearing mir_inwservice here will also result in 2744 * any thread waiting in mir_close() to be signaled. 2745 */ 2746 mir->mir_inwservice = 0; 2747 2748 if (mir->mir_type != RPC_SERVER) { 2749 mutex_exit(&mir->mir_mutex); 2750 return; 2751 } 2752 2753 /* 2754 * If idle we call mir_svc_idle_start to start the timer (or wakeup 2755 * a close). Also make sure not to start the idle timer on the 2756 * listener stream. This can cause nfsd to send an orderly release 2757 * command on the listener stream. 2758 */ 2759 if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) { 2760 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p " 2761 "because mir slot is idle\n", (void *)q); 2762 mir_svc_idle_start(q, mir); 2763 } 2764 2765 /* 2766 * If outbound flow control has been relieved, then allow new 2767 * inbound requests to be processed. 2768 */ 2769 if (mir->mir_hold_inbound) { 2770 mir->mir_hold_inbound = 0; 2771 qenable(RD(q)); 2772 } 2773 mutex_exit(&mir->mir_mutex); 2774 } 2775 2776 static void 2777 mir_disconnect(queue_t *q, mir_t *mir) 2778 { 2779 ASSERT(MUTEX_HELD(&mir->mir_mutex)); 2780 2781 switch (mir->mir_type) { 2782 case RPC_CLIENT: 2783 /* 2784 * We are disconnecting, but not necessarily 2785 * closing. By not closing, we will fail to 2786 * pick up a possibly changed global timeout value, 2787 * unless we store it now. 2788 */ 2789 mir->mir_idle_timeout = clnt_idle_timeout; 2790 mir_clnt_idle_start(WR(q), mir); 2791 mutex_exit(&mir->mir_mutex); 2792 2793 /* 2794 * T_DISCON_REQ is passed to kRPC as an integer value 2795 * (this is not a TPI message). It is used as a 2796 * convenient value to indicate a sanity check 2797 * failure -- the same kRPC routine is also called 2798 * for T_DISCON_INDs and T_ORDREL_INDs. 2799 */ 2800 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0); 2801 break; 2802 2803 case RPC_SERVER: 2804 mir->mir_svc_no_more_msgs = 1; 2805 mir_svc_idle_stop(WR(q), mir); 2806 mutex_exit(&mir->mir_mutex); 2807 RPCLOG(16, "mir_disconnect: telling " 2808 "stream head listener to disconnect stream " 2809 "(0x%p)\n", (void *) q); 2810 (void) mir_svc_policy_notify(q, 2); 2811 break; 2812 2813 default: 2814 mutex_exit(&mir->mir_mutex); 2815 break; 2816 } 2817 } 2818 2819 /* 2820 * Sanity check the message length, and if it's too large, shutdown the 2821 * connection. Returns 1 if the connection is shutdown; 0 otherwise. 2822 */ 2823 static int 2824 mir_check_len(queue_t *q, mblk_t *head_mp) 2825 { 2826 mir_t *mir = q->q_ptr; 2827 uint_t maxsize = 0; 2828 size_t msg_len = msgdsize(head_mp); 2829 2830 if (mir->mir_max_msg_sizep != NULL) 2831 maxsize = *mir->mir_max_msg_sizep; 2832 2833 if (maxsize == 0 || msg_len <= maxsize) 2834 return (0); 2835 2836 freemsg(head_mp); 2837 mir->mir_head_mp = NULL; 2838 mir->mir_tail_mp = NULL; 2839 mir->mir_frag_header = 0; 2840 mir->mir_frag_len = -(int32_t)sizeof (uint32_t); 2841 if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) { 2842 cmn_err(CE_NOTE, 2843 "kRPC: record fragment from %s of size(%lu) exceeds " 2844 "maximum (%u). Disconnecting", 2845 (mir->mir_type == RPC_CLIENT) ? "server" : 2846 (mir->mir_type == RPC_SERVER) ? "client" : 2847 "test tool", msg_len, maxsize); 2848 } 2849 2850 mir_disconnect(q, mir); 2851 return (1); 2852 } 2853