/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ /* Copyright (c) 1990 Mentat Inc. */ /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ /* All Rights Reserved */ #pragma ident "%Z%%M% %I% %E% SMI" /* * Kernel RPC filtering module */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* * This is the loadable module wrapper. */ #include #include #include extern struct streamtab rpcinfo; static struct fmodsw fsw = { "rpcmod", &rpcinfo, D_NEW|D_MP, }; /* * Module linkage information for the kernel. */ static struct modlstrmod modlstrmod = { &mod_strmodops, "rpc interface str mod", &fsw }; /* * For the RPC system call. */ static struct sysent rpcsysent = { 2, SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD, rpcsys }; static struct modlsys modlsys = { &mod_syscallops, "RPC syscall", &rpcsysent }; #ifdef _SYSCALL32_IMPL static struct modlsys modlsys32 = { &mod_syscallops32, "32-bit RPC syscall", &rpcsysent }; #endif /* _SYSCALL32_IMPL */ static struct modlinkage modlinkage = { MODREV_1, { &modlsys, #ifdef _SYSCALL32_IMPL &modlsys32, #endif &modlstrmod, NULL } }; int _init(void) { int error = 0; callb_id_t cid; int status; svc_init(); clnt_init(); cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc"); if (error = mod_install(&modlinkage)) { /* * Could not install module, cleanup previous * initialization work. */ clnt_fini(); if (cid != NULL) (void) callb_delete(cid); return (error); } /* * Load up the RDMA plugins and initialize the stats. Even if the * plugins loadup fails, but rpcmod was successfully installed the * counters still get initialized. */ rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL); mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL); mt_kstat_init(); /* * Get our identification into ldi. This is used for loading * other modules, e.g. rpcib. */ status = ldi_ident_from_mod(&modlinkage, &rpcmod_li); if (status != 0) { cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status); rpcmod_li = NULL; } return (error); } /* * The unload entry point fails, because we advertise entry points into * rpcmod from the rest of kRPC: rpcmod_release(). */ int _fini(void) { return (EBUSY); } int _info(struct modinfo *modinfop) { return (mod_info(&modlinkage, modinfop)); } extern int nulldev(); #define RPCMOD_ID 2049 int rmm_open(), rmm_close(); /* * To save instructions, since STREAMS ignores the return value * from these functions, they are defined as void here. Kind of icky, but... */ void rmm_rput(queue_t *, mblk_t *); void rmm_wput(queue_t *, mblk_t *); void rmm_rsrv(queue_t *); void rmm_wsrv(queue_t *); int rpcmodopen(), rpcmodclose(); void rpcmodrput(), rpcmodwput(); void rpcmodrsrv(), rpcmodwsrv(); static void rpcmodwput_other(queue_t *, mblk_t *); static int mir_close(queue_t *q); static int mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp); static void mir_rput(queue_t *q, mblk_t *mp); static void mir_rsrv(queue_t *q); static void mir_wput(queue_t *q, mblk_t *mp); static void mir_wsrv(queue_t *q); static struct module_info rpcmod_info = {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024}; /* * Read side has no service procedure. */ static struct qinit rpcmodrinit = { (int (*)())rmm_rput, (int (*)())rmm_rsrv, rmm_open, rmm_close, nulldev, &rpcmod_info, NULL }; /* * The write put procedure is simply putnext to conserve stack space. * The write service procedure is not used to queue data, but instead to * synchronize with flow control. */ static struct qinit rpcmodwinit = { (int (*)())rmm_wput, (int (*)())rmm_wsrv, rmm_open, rmm_close, nulldev, &rpcmod_info, NULL }; struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL }; struct xprt_style_ops { int (*xo_open)(); int (*xo_close)(); void (*xo_wput)(); void (*xo_wsrv)(); void (*xo_rput)(); void (*xo_rsrv)(); }; static struct xprt_style_ops xprt_clts_ops = { rpcmodopen, rpcmodclose, rpcmodwput, rpcmodwsrv, rpcmodrput, NULL }; static struct xprt_style_ops xprt_cots_ops = { mir_open, mir_close, mir_wput, mir_wsrv, mir_rput, mir_rsrv }; /* * Per rpcmod "slot" data structure. q->q_ptr points to one of these. */ struct rpcm { void *rm_krpc_cell; /* Reserved for use by KRPC */ struct xprt_style_ops *rm_ops; int rm_type; /* Client or server side stream */ #define RM_CLOSING 0x1 /* somebody is trying to close slot */ uint_t rm_state; /* state of the slot. see above */ uint_t rm_ref; /* cnt of external references to slot */ kmutex_t rm_lock; /* mutex protecting above fields */ kcondvar_t rm_cwait; /* condition for closing */ zoneid_t rm_zoneid; /* zone which pushed rpcmod */ }; struct temp_slot { void *cell; struct xprt_style_ops *ops; int type; mblk_t *info_ack; kmutex_t lock; kcondvar_t wait; }; void tmp_rput(queue_t *q, mblk_t *mp); struct xprt_style_ops tmpops = { NULL, NULL, putnext, NULL, tmp_rput, NULL }; void tmp_rput(queue_t *q, mblk_t *mp) { struct temp_slot *t = (struct temp_slot *)(q->q_ptr); struct T_info_ack *pptr; switch (mp->b_datap->db_type) { case M_PCPROTO: pptr = (struct T_info_ack *)mp->b_rptr; switch (pptr->PRIM_type) { case T_INFO_ACK: mutex_enter(&t->lock); t->info_ack = mp; cv_signal(&t->wait); mutex_exit(&t->lock); return; default: break; } default: break; } /* * Not an info-ack, so free it. This is ok because we should * not be receiving data until the open finishes: rpcmod * is pushed well before the end-point is bound to an address. */ freemsg(mp); } int rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) { mblk_t *bp; struct temp_slot ts, *t; struct T_info_ack *pptr; int error = 0; int procson = 0; ASSERT(q != NULL); /* * Check for re-opens. */ if (q->q_ptr) { TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "q->qptr"); return (0); } t = &ts; bzero(t, sizeof (*t)); q->q_ptr = (void *)t; /* WR(q)->q_ptr = (void *)t; */ /* * Allocate the required messages upfront. */ if ((bp = allocb(sizeof (struct T_info_req) + sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) { return (ENOBUFS); } mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL); cv_init(&t->wait, NULL, CV_DEFAULT, NULL); t->ops = &tmpops; qprocson(q); procson = 1; bp->b_datap->db_type = M_PCPROTO; *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ; bp->b_wptr += sizeof (struct T_info_req); putnext(WR(q), bp); mutex_enter(&t->lock); while ((bp = t->info_ack) == NULL) { if (cv_wait_sig(&t->wait, &t->lock) == 0) { error = EINTR; break; } } mutex_exit(&t->lock); mutex_destroy(&t->lock); cv_destroy(&t->wait); if (error) goto out; pptr = (struct T_info_ack *)t->info_ack->b_rptr; if (pptr->SERV_type == T_CLTS) { error = rpcmodopen(q, devp, flag, sflag, crp); if (error == 0) { t = (struct temp_slot *)q->q_ptr; t->ops = &xprt_clts_ops; } } else { error = mir_open(q, devp, flag, sflag, crp); if (error == 0) { t = (struct temp_slot *)q->q_ptr; t->ops = &xprt_cots_ops; } } out: freemsg(bp); if (error && procson) qprocsoff(q); return (error); } void rmm_rput(queue_t *q, mblk_t *mp) { (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp); } void rmm_rsrv(queue_t *q) { (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q); } void rmm_wput(queue_t *q, mblk_t *mp) { (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp); } void rmm_wsrv(queue_t *q) { (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q); } int rmm_close(queue_t *q, int flag, cred_t *crp) { return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp)); } /* * rpcmodopen - open routine gets called when the module gets pushed * onto the stream. */ /*ARGSUSED*/ int rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp) { struct rpcm *rmp; extern void (*rpc_rele)(queue_t *, mblk_t *); static void rpcmod_release(queue_t *, mblk_t *); TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:"); /* * Initialize entry points to release a rpcmod slot (and an input * message if supplied) and to send an output message to the module * below rpcmod. */ if (rpc_rele == NULL) rpc_rele = rpcmod_release; /* * Only sufficiently privileged users can use this module, and it * is assumed that they will use this module properly, and NOT send * bulk data from downstream. */ if (secpolicy_rpcmod_open(crp) != 0) return (EPERM); /* * Allocate slot data structure. */ rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP); mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL); cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL); rmp->rm_zoneid = rpc_zoneid(); /* * slot type will be set by kRPC client and server ioctl's */ rmp->rm_type = 0; q->q_ptr = (void *)rmp; WR(q)->q_ptr = (void *)rmp; TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end"); return (0); } /* * rpcmodclose - This routine gets called when the module gets popped * off of the stream. */ /*ARGSUSED*/ int rpcmodclose(queue_t *q, int flag, cred_t *crp) { struct rpcm *rmp; ASSERT(q != NULL); rmp = (struct rpcm *)q->q_ptr; /* * Mark our state as closing. */ mutex_enter(&rmp->rm_lock); rmp->rm_state |= RM_CLOSING; /* * Check and see if there are any messages on the queue. If so, send * the messages, regardless whether the downstream module is ready to * accept data. */ if (rmp->rm_type == RPC_SERVER) { flushq(q, FLUSHDATA); qenable(WR(q)); if (rmp->rm_ref) { mutex_exit(&rmp->rm_lock); /* * call into SVC to clean the queue */ svc_queueclean(q); mutex_enter(&rmp->rm_lock); /* * Block while there are kRPC threads with a reference * to this message. */ while (rmp->rm_ref) cv_wait(&rmp->rm_cwait, &rmp->rm_lock); } mutex_exit(&rmp->rm_lock); /* * It is now safe to remove this queue from the stream. No kRPC * threads have a reference to the stream, and none ever will, * because RM_CLOSING is set. */ qprocsoff(q); /* Notify kRPC that this stream is going away. */ svc_queueclose(q); } else { mutex_exit(&rmp->rm_lock); qprocsoff(q); } q->q_ptr = NULL; WR(q)->q_ptr = NULL; mutex_destroy(&rmp->rm_lock); cv_destroy(&rmp->rm_cwait); kmem_free(rmp, sizeof (*rmp)); return (0); } #ifdef DEBUG int rpcmod_send_msg_up = 0; int rpcmod_send_uderr = 0; int rpcmod_send_dup = 0; int rpcmod_send_dup_cnt = 0; #endif /* * rpcmodrput - Module read put procedure. This is called from * the module, driver, or stream head downstream. */ void rpcmodrput(queue_t *q, mblk_t *mp) { struct rpcm *rmp; union T_primitives *pptr; int hdrsz; TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:"); ASSERT(q != NULL); rmp = (struct rpcm *)q->q_ptr; if (rmp->rm_type == 0) { freemsg(mp); return; } #ifdef DEBUG if (rpcmod_send_msg_up > 0) { mblk_t *nmp = copymsg(mp); if (nmp) { putnext(q, nmp); rpcmod_send_msg_up--; } } if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) { mblk_t *nmp; struct T_unitdata_ind *data; struct T_uderror_ind *ud; int d; data = (struct T_unitdata_ind *)mp->b_rptr; if (data->PRIM_type == T_UNITDATA_IND) { d = sizeof (*ud) - sizeof (*data); nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI); if (nmp) { ud = (struct T_uderror_ind *)nmp->b_rptr; ud->PRIM_type = T_UDERROR_IND; ud->DEST_length = data->SRC_length; ud->DEST_offset = data->SRC_offset + d; ud->OPT_length = data->OPT_length; ud->OPT_offset = data->OPT_offset + d; ud->ERROR_type = ENETDOWN; if (data->SRC_length) { bcopy(mp->b_rptr + data->SRC_offset, nmp->b_rptr + ud->DEST_offset, data->SRC_length); } if (data->OPT_length) { bcopy(mp->b_rptr + data->OPT_offset, nmp->b_rptr + ud->OPT_offset, data->OPT_length); } nmp->b_wptr += d; nmp->b_wptr += (mp->b_wptr - mp->b_rptr); nmp->b_datap->db_type = M_PROTO; putnext(q, nmp); rpcmod_send_uderr--; } } } #endif switch (mp->b_datap->db_type) { default: putnext(q, mp); break; case M_PROTO: case M_PCPROTO: ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t)); pptr = (union T_primitives *)mp->b_rptr; /* * Forward this message to krpc if it is data. */ if (pptr->type == T_UNITDATA_IND) { mblk_t *nmp; /* * Check if the module is being popped. */ mutex_enter(&rmp->rm_lock); if (rmp->rm_state & RM_CLOSING) { mutex_exit(&rmp->rm_lock); putnext(q, mp); break; } switch (rmp->rm_type) { case RPC_CLIENT: mutex_exit(&rmp->rm_lock); hdrsz = mp->b_wptr - mp->b_rptr; /* * Make sure the header is sane. */ if (hdrsz < TUNITDATAINDSZ || hdrsz < (pptr->unitdata_ind.OPT_length + pptr->unitdata_ind.OPT_offset) || hdrsz < (pptr->unitdata_ind.SRC_length + pptr->unitdata_ind.SRC_offset)) { freemsg(mp); return; } /* * Call clnt_clts_dispatch_notify, so that it can * pass the message to the proper caller. Don't * discard the header just yet since the client may * need the sender's address. */ clnt_clts_dispatch_notify(mp, hdrsz, rmp->rm_zoneid); return; case RPC_SERVER: /* * rm_krpc_cell is exclusively used by the kRPC * CLTS server */ if (rmp->rm_krpc_cell) { #ifdef DEBUG /* * Test duplicate request cache and * rm_ref count handling by sending a * duplicate every so often, if * desired. */ if (rpcmod_send_dup && rpcmod_send_dup_cnt++ % rpcmod_send_dup) nmp = copymsg(mp); else nmp = NULL; #endif /* * Raise the reference count on this * module to prevent it from being * popped before krpc generates the * reply. */ rmp->rm_ref++; mutex_exit(&rmp->rm_lock); /* * Submit the message to krpc. */ svc_queuereq(q, mp); #ifdef DEBUG /* * Send duplicate if we created one. */ if (nmp) { mutex_enter(&rmp->rm_lock); rmp->rm_ref++; mutex_exit(&rmp->rm_lock); svc_queuereq(q, nmp); } #endif } else { mutex_exit(&rmp->rm_lock); freemsg(mp); } return; default: mutex_exit(&rmp->rm_lock); freemsg(mp); return; } /* end switch(rmp->rm_type) */ } else if (pptr->type == T_UDERROR_IND) { mutex_enter(&rmp->rm_lock); hdrsz = mp->b_wptr - mp->b_rptr; /* * Make sure the header is sane */ if (hdrsz < TUDERRORINDSZ || hdrsz < (pptr->uderror_ind.OPT_length + pptr->uderror_ind.OPT_offset) || hdrsz < (pptr->uderror_ind.DEST_length + pptr->uderror_ind.DEST_offset)) { mutex_exit(&rmp->rm_lock); freemsg(mp); return; } /* * In the case where a unit data error has been * received, all we need to do is clear the message from * the queue. */ mutex_exit(&rmp->rm_lock); freemsg(mp); RPCLOG(32, "rpcmodrput: unitdata error received at " "%ld\n", gethrestime_sec()); return; } /* end else if (pptr->type == T_UDERROR_IND) */ putnext(q, mp); break; } /* end switch (mp->b_datap->db_type) */ TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END, "rpcmodrput_end:"); /* * Return codes are not looked at by the STREAMS framework. */ } /* * write put procedure */ void rpcmodwput(queue_t *q, mblk_t *mp) { struct rpcm *rmp; ASSERT(q != NULL); switch (mp->b_datap->db_type) { case M_PROTO: case M_PCPROTO: break; default: rpcmodwput_other(q, mp); return; } /* * Check to see if we can send the message downstream. */ if (canputnext(q)) { putnext(q, mp); return; } rmp = (struct rpcm *)q->q_ptr; ASSERT(rmp != NULL); /* * The first canputnext failed. Try again except this time with the * lock held, so that we can check the state of the stream to see if * it is closing. If either of these conditions evaluate to true * then send the meesage. */ mutex_enter(&rmp->rm_lock); if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) { mutex_exit(&rmp->rm_lock); putnext(q, mp); } else { /* * canputnext failed again and the stream is not closing. * Place the message on the queue and let the service * procedure handle the message. */ mutex_exit(&rmp->rm_lock); (void) putq(q, mp); } } static void rpcmodwput_other(queue_t *q, mblk_t *mp) { struct rpcm *rmp; struct iocblk *iocp; rmp = (struct rpcm *)q->q_ptr; ASSERT(rmp != NULL); switch (mp->b_datap->db_type) { case M_IOCTL: iocp = (struct iocblk *)mp->b_rptr; ASSERT(iocp != NULL); switch (iocp->ioc_cmd) { case RPC_CLIENT: case RPC_SERVER: mutex_enter(&rmp->rm_lock); rmp->rm_type = iocp->ioc_cmd; mutex_exit(&rmp->rm_lock); mp->b_datap->db_type = M_IOCACK; qreply(q, mp); return; default: /* * pass the ioctl downstream and hope someone * down there knows how to handle it. */ putnext(q, mp); return; } default: break; } /* * This is something we definitely do not know how to handle, just * pass the message downstream */ putnext(q, mp); } /* * Module write service procedure. This is called by downstream modules * for back enabling during flow control. */ void rpcmodwsrv(queue_t *q) { struct rpcm *rmp; mblk_t *mp = NULL; rmp = (struct rpcm *)q->q_ptr; ASSERT(rmp != NULL); /* * Get messages that may be queued and send them down stream */ while ((mp = getq(q)) != NULL) { /* * Optimize the service procedure for the server-side, by * avoiding a call to canputnext(). */ if (rmp->rm_type == RPC_SERVER || canputnext(q)) { putnext(q, mp); continue; } (void) putbq(q, mp); return; } } static void rpcmod_release(queue_t *q, mblk_t *bp) { struct rpcm *rmp; /* * For now, just free the message. */ if (bp) freemsg(bp); rmp = (struct rpcm *)q->q_ptr; mutex_enter(&rmp->rm_lock); rmp->rm_ref--; if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) { cv_broadcast(&rmp->rm_cwait); } mutex_exit(&rmp->rm_lock); } /* * This part of rpcmod is pushed on a connection-oriented transport for use * by RPC. It serves to bypass the Stream head, implements * the record marking protocol, and dispatches incoming RPC messages. */ /* Default idle timer values */ #define MIR_CLNT_IDLE_TIMEOUT (5 * (60 * 1000L)) /* 5 minutes */ #define MIR_SVC_IDLE_TIMEOUT (6 * (60 * 1000L)) /* 6 minutes */ #define MIR_SVC_ORDREL_TIMEOUT (10 * (60 * 1000L)) /* 10 minutes */ #define MIR_LASTFRAG 0x80000000 /* Record marker */ #define DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr)) typedef struct mir_s { void *mir_krpc_cell; /* Reserved for KRPC use. This field */ /* must be first in the structure. */ struct xprt_style_ops *rm_ops; int mir_type; /* Client or server side stream */ mblk_t *mir_head_mp; /* RPC msg in progress */ /* * mir_head_mp points the first mblk being collected in * the current RPC message. Record headers are removed * before data is linked into mir_head_mp. */ mblk_t *mir_tail_mp; /* Last mblk in mir_head_mp */ /* * mir_tail_mp points to the last mblk in the message * chain starting at mir_head_mp. It is only valid * if mir_head_mp is non-NULL and is used to add new * data blocks to the end of chain quickly. */ int32_t mir_frag_len; /* Bytes seen in the current frag */ /* * mir_frag_len starts at -4 for beginning of each fragment. * When this length is negative, it indicates the number of * bytes that rpcmod needs to complete the record marker * header. When it is positive or zero, it holds the number * of bytes that have arrived for the current fragment and * are held in mir_header_mp. */ int32_t mir_frag_header; /* * Fragment header as collected for the current fragment. * It holds the last-fragment indicator and the number * of bytes in the fragment. */ unsigned int mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */ mir_hold_inbound : 1, /* Hold inbound messages on server */ /* side until outbound flow control */ /* is relieved. */ mir_closing : 1, /* The stream is being closed */ mir_inrservice : 1, /* data queued or rd srv proc running */ mir_inwservice : 1, /* data queued or wr srv proc running */ mir_inwflushdata : 1, /* flush M_DATAs when srv runs */ /* * On client streams, mir_clntreq is 0 or 1; it is set * to 1 whenever a new request is sent out (mir_wput) * and cleared when the timer fires (mir_timer). If * the timer fires with this value equal to 0, then the * stream is considered idle and KRPC is notified. */ mir_clntreq : 1, /* * On server streams, stop accepting messages */ mir_svc_no_more_msgs : 1, mir_listen_stream : 1, /* listen end point */ mir_unused : 1, /* no longer used */ mir_timer_call : 1, mir_junk_fill_thru_bit_31 : 21; int mir_setup_complete; /* server has initialized everything */ timeout_id_t mir_timer_id; /* Timer for idle checks */ clock_t mir_idle_timeout; /* Allowed idle time before shutdown */ /* * This value is copied from clnt_idle_timeout or * svc_idle_timeout during the appropriate ioctl. * Kept in milliseconds */ clock_t mir_use_timestamp; /* updated on client with each use */ /* * This value is set to lbolt * every time a client stream sends or receives data. * Even if the timer message arrives, we don't shutdown * client unless: * lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp. * This value is kept in HZ. */ uint_t *mir_max_msg_sizep; /* Reference to sanity check size */ /* * This pointer is set to &clnt_max_msg_size or * &svc_max_msg_size during the appropriate ioctl. */ zoneid_t mir_zoneid; /* zone which pushed rpcmod */ /* Server-side fields. */ int mir_ref_cnt; /* Reference count: server side only */ /* counts the number of references */ /* that a kernel RPC server thread */ /* (see svc_run()) has on this rpcmod */ /* slot. Effectively, it is the */ /* number * of unprocessed messages */ /* that have been passed up to the */ /* KRPC layer */ mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */ /* T_DISCON_IND */ /* * these fields are for both client and server, but for debugging, * it is easier to have these last in the structure. */ kmutex_t mir_mutex; /* Mutex and condvar for close */ kcondvar_t mir_condvar; /* synchronization. */ kcondvar_t mir_timer_cv; /* Timer routine sync. */ } mir_t; #define MIR_SVC_QUIESCED(mir) \ (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0) #define MIR_CLEAR_INRSRV(mir_ptr) { \ (mir_ptr)->mir_inrservice = 0; \ if ((mir_ptr)->mir_type == RPC_SERVER && \ (mir_ptr)->mir_closing) \ cv_signal(&(mir_ptr)->mir_condvar); \ } /* * Don't block service procedure (and mir_close) if * we are in the process of closing. */ #define MIR_WCANPUTNEXT(mir_ptr, write_q) \ (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1)) static int mir_clnt_dup_request(queue_t *q, mblk_t *mp); static void mir_rput_proto(queue_t *q, mblk_t *mp); static int mir_svc_policy_notify(queue_t *q, int event); static void mir_svc_release(queue_t *wq, mblk_t *mp); static void mir_svc_start(queue_t *wq); static void mir_svc_idle_start(queue_t *, mir_t *); static void mir_svc_idle_stop(queue_t *, mir_t *); static void mir_svc_start_close(queue_t *, mir_t *); static void mir_clnt_idle_do_stop(queue_t *); static void mir_clnt_idle_stop(queue_t *, mir_t *); static void mir_clnt_idle_start(queue_t *, mir_t *); static void mir_wput(queue_t *q, mblk_t *mp); static void mir_wput_other(queue_t *q, mblk_t *mp); static void mir_wsrv(queue_t *q); static void mir_disconnect(queue_t *, mir_t *ir); static int mir_check_len(queue_t *, int32_t, mblk_t *); static void mir_timer(void *); extern void (*mir_rele)(queue_t *, mblk_t *); extern void (*mir_start)(queue_t *); extern void (*clnt_stop_idle)(queue_t *); clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT; clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT; /* * Timeout for subsequent notifications of idle connection. This is * typically used to clean up after a wedged orderly release. */ clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */ extern uint_t *clnt_max_msg_sizep; extern uint_t *svc_max_msg_sizep; uint_t clnt_max_msg_size = RPC_MAXDATASIZE; uint_t svc_max_msg_size = RPC_MAXDATASIZE; uint_t mir_krpc_cell_null; static void mir_timer_stop(mir_t *mir) { timeout_id_t tid; ASSERT(MUTEX_HELD(&mir->mir_mutex)); /* * Since the mir_mutex lock needs to be released to call * untimeout(), we need to make sure that no other thread * can start/stop the timer (changing mir_timer_id) during * that time. The mir_timer_call bit and the mir_timer_cv * condition variable are used to synchronize this. Setting * mir_timer_call also tells mir_timer() (refer to the comments * in mir_timer()) that it does not need to do anything. */ while (mir->mir_timer_call) cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); mir->mir_timer_call = B_TRUE; if ((tid = mir->mir_timer_id) != 0) { mir->mir_timer_id = 0; mutex_exit(&mir->mir_mutex); (void) untimeout(tid); mutex_enter(&mir->mir_mutex); } mir->mir_timer_call = B_FALSE; cv_broadcast(&mir->mir_timer_cv); } static void mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl) { timeout_id_t tid; ASSERT(MUTEX_HELD(&mir->mir_mutex)); while (mir->mir_timer_call) cv_wait(&mir->mir_timer_cv, &mir->mir_mutex); mir->mir_timer_call = B_TRUE; if ((tid = mir->mir_timer_id) != 0) { mutex_exit(&mir->mir_mutex); (void) untimeout(tid); mutex_enter(&mir->mir_mutex); } /* Only start the timer when it is not closing. */ if (!mir->mir_closing) { mir->mir_timer_id = timeout(mir_timer, q, MSEC_TO_TICK(intrvl)); } mir->mir_timer_call = B_FALSE; cv_broadcast(&mir->mir_timer_cv); } static int mir_clnt_dup_request(queue_t *q, mblk_t *mp) { mblk_t *mp1; uint32_t new_xid; uint32_t old_xid; ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex)); new_xid = BE32_TO_U32(&mp->b_rptr[4]); /* * This loop is a bit tacky -- it walks the STREAMS list of * flow-controlled messages. */ if ((mp1 = q->q_first) != NULL) { do { old_xid = BE32_TO_U32(&mp1->b_rptr[4]); if (new_xid == old_xid) return (1); } while ((mp1 = mp1->b_next) != NULL); } return (0); } static int mir_close(queue_t *q) { mir_t *mir; mblk_t *mp; bool_t queue_cleaned = FALSE; RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q); mir = (mir_t *)q->q_ptr; ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); mutex_enter(&mir->mir_mutex); if ((mp = mir->mir_head_mp) != NULL) { mir->mir_head_mp = (mblk_t *)0; freemsg(mp); } /* * Set mir_closing so we get notified when MIR_SVC_QUIESCED() * is TRUE. And mir_timer_start() won't start the timer again. */ mir->mir_closing = B_TRUE; mir_timer_stop(mir); if (mir->mir_type == RPC_SERVER) { flushq(q, FLUSHDATA); /* Ditch anything waiting on read q */ /* * This will prevent more requests from arriving and * will force rpcmod to ignore flow control. */ mir_svc_start_close(WR(q), mir); while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) { if (mir->mir_ref_cnt && !mir->mir_inrservice && (queue_cleaned == FALSE)) { /* * call into SVC to clean the queue */ mutex_exit(&mir->mir_mutex); svc_queueclean(q); queue_cleaned = TRUE; mutex_enter(&mir->mir_mutex); continue; } /* * Bugid 1253810 - Force the write service * procedure to send its messages, regardless * whether the downstream module is ready * to accept data. */ if (mir->mir_inwservice == 1) qenable(WR(q)); cv_wait(&mir->mir_condvar, &mir->mir_mutex); } mutex_exit(&mir->mir_mutex); qprocsoff(q); /* Notify KRPC that this stream is going away. */ svc_queueclose(q); } else { mutex_exit(&mir->mir_mutex); qprocsoff(q); } mutex_destroy(&mir->mir_mutex); cv_destroy(&mir->mir_condvar); cv_destroy(&mir->mir_timer_cv); kmem_free(mir, sizeof (mir_t)); return (0); } /* * This is server side only (RPC_SERVER). * * Exit idle mode. */ static void mir_svc_idle_stop(queue_t *q, mir_t *mir) { ASSERT(MUTEX_HELD(&mir->mir_mutex)); ASSERT((q->q_flag & QREADR) == 0); ASSERT(mir->mir_type == RPC_SERVER); RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q); mir_timer_stop(mir); } /* * This is server side only (RPC_SERVER). * * Start idle processing, which will include setting idle timer if the * stream is not being closed. */ static void mir_svc_idle_start(queue_t *q, mir_t *mir) { ASSERT(MUTEX_HELD(&mir->mir_mutex)); ASSERT((q->q_flag & QREADR) == 0); ASSERT(mir->mir_type == RPC_SERVER); RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q); /* * Don't re-start idle timer if we are closing queues. */ if (mir->mir_closing) { RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n", (void *)q); /* * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED() * is true. When it is true, and we are in the process of * closing the stream, signal any thread waiting in * mir_close(). */ if (mir->mir_inwservice == 0) cv_signal(&mir->mir_condvar); } else { RPCLOG(16, "mir_svc_idle_start - reset %s timer\n", mir->mir_ordrel_pending ? "ordrel" : "normal"); /* * Normal condition, start the idle timer. If an orderly * release has been sent, set the timeout to wait for the * client to close its side of the connection. Otherwise, * use the normal idle timeout. */ mir_timer_start(q, mir, mir->mir_ordrel_pending ? svc_ordrel_timeout : mir->mir_idle_timeout); } } /* ARGSUSED */ static int mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp) { mir_t *mir; RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q); /* Set variables used directly by KRPC. */ if (!mir_rele) mir_rele = mir_svc_release; if (!mir_start) mir_start = mir_svc_start; if (!clnt_stop_idle) clnt_stop_idle = mir_clnt_idle_do_stop; if (!clnt_max_msg_sizep) clnt_max_msg_sizep = &clnt_max_msg_size; if (!svc_max_msg_sizep) svc_max_msg_sizep = &svc_max_msg_size; /* Allocate a zero'ed out mir structure for this stream. */ mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP); /* * We set hold inbound here so that incoming messages will * be held on the read-side queue until the stream is completely * initialized with a RPC_CLIENT or RPC_SERVER ioctl. During * the ioctl processing, the flag is cleared and any messages that * arrived between the open and the ioctl are delivered to KRPC. * * Early data should never arrive on a client stream since * servers only respond to our requests and we do not send any. * until after the stream is initialized. Early data is * very common on a server stream where the client will start * sending data as soon as the connection is made (and this * is especially true with TCP where the protocol accepts the * connection before nfsd or KRPC is notified about it). */ mir->mir_hold_inbound = 1; /* * Start the record marker looking for a 4-byte header. When * this length is negative, it indicates that rpcmod is looking * for bytes to consume for the record marker header. When it * is positive, it holds the number of bytes that have arrived * for the current fragment and are being held in mir_header_mp. */ mir->mir_frag_len = -(int32_t)sizeof (uint32_t); mir->mir_zoneid = rpc_zoneid(); mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL); cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL); cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL); q->q_ptr = (char *)mir; WR(q)->q_ptr = (char *)mir; /* * We noenable the read-side queue because we don't want it * automatically enabled by putq. We enable it explicitly * in mir_wsrv when appropriate. (See additional comments on * flow control at the beginning of mir_rsrv.) */ noenable(q); qprocson(q); return (0); } /* * Read-side put routine for both the client and server side. Does the * record marking for incoming RPC messages, and when complete, dispatches * the message to either the client or server. */ static void mir_do_rput(queue_t *q, mblk_t *mp, int srv) { mblk_t *cont_mp; int excess; int32_t frag_len; int32_t frag_header; mblk_t *head_mp; int len; mir_t *mir; mblk_t *mp1; unsigned char *rptr; mblk_t *tail_mp; unsigned char *wptr; boolean_t stop_timer = B_FALSE; mir = (mir_t *)q->q_ptr; ASSERT(mir != NULL); /* * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER * with the corresponding ioctl, then don't accept * any inbound data. This should never happen for streams * created by nfsd or client-side KRPC because they are careful * to set the mode of the stream before doing anything else. */ if (mir->mir_type == 0) { freemsg(mp); return; } ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); switch (mp->b_datap->db_type) { case M_DATA: break; case M_PROTO: case M_PCPROTO: rptr = mp->b_rptr; if (mp->b_wptr - rptr < sizeof (uint32_t)) { RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n", (int)(mp->b_wptr - rptr)); freemsg(mp); return; } if (((union T_primitives *)rptr)->type != T_DATA_IND) { mir_rput_proto(q, mp); return; } /* Throw away the T_DATA_IND block and continue with data. */ mp1 = mp; mp = mp->b_cont; freeb(mp1); break; case M_SETOPTS: /* * If a module on the stream is trying set the Stream head's * high water mark, then set our hiwater to the requested * value. We are the "stream head" for all inbound * data messages since messages are passed directly to KRPC. */ if ((mp->b_wptr - mp->b_rptr) >= sizeof (struct stroptions)) { struct stroptions *stropts; stropts = (struct stroptions *)mp->b_rptr; if ((stropts->so_flags & SO_HIWAT) && !(stropts->so_flags & SO_BAND)) { (void) strqset(q, QHIWAT, 0, stropts->so_hiwat); } } putnext(q, mp); return; case M_FLUSH: RPCLOG(32, "mir_do_rput: ignoring M_FLUSH on q 0x%p. ", (void *)q); RPCLOG(32, "M_FLUSH is %x\n", (uint_t)*mp->b_rptr); putnext(q, mp); return; default: putnext(q, mp); return; } mutex_enter(&mir->mir_mutex); /* * If this connection is closing, don't accept any new messages. */ if (mir->mir_svc_no_more_msgs) { ASSERT(mir->mir_type == RPC_SERVER); mutex_exit(&mir->mir_mutex); freemsg(mp); return; } /* Get local copies for quicker access. */ frag_len = mir->mir_frag_len; frag_header = mir->mir_frag_header; head_mp = mir->mir_head_mp; tail_mp = mir->mir_tail_mp; /* Loop, processing each message block in the mp chain separately. */ do { /* * cont_mp is used in the do/while condition below to * walk to the next block in the STREAMS message. * mp->b_cont may be nil'ed during processing so we * can't rely on it to find the next block. */ cont_mp = mp->b_cont; /* * Get local copies of rptr and wptr for our processing. * These always point into "mp" (the current block being * processed), but rptr is updated as we consume any * record header in this message, and wptr is updated to * point to the end of the data for the current fragment, * if it ends in this block. The main point is that * they are not always the same as b_rptr and b_wptr. * b_rptr and b_wptr will be updated when appropriate. */ rptr = mp->b_rptr; wptr = mp->b_wptr; same_mblk:; len = (int)(wptr - rptr); if (len <= 0) { /* * If we have processed all of the data in the message * or the block is empty to begin with, then we're * done with this block and can go on to cont_mp, * if there is one. * * First, we check to see if the current block is * now zero-length and, if so, we free it. * This happens when either the block was empty * to begin with or we consumed all of the data * for the record marking header. */ if (rptr <= mp->b_rptr) { /* * If head_mp is non-NULL, add cont_mp to the * mblk list. XXX But there is a possibility * that tail_mp = mp or even head_mp = mp XXX */ if (head_mp) { if (head_mp == mp) head_mp = NULL; else if (tail_mp != mp) { ASSERT((tail_mp->b_cont == NULL) || (tail_mp->b_cont == mp)); tail_mp->b_cont = cont_mp; /* * It's possible that, because * of a very short mblk (0-3 * bytes), we've ended up here * and that cont_mp could be * NULL (if we're at the end * of an mblk chain). If so, * don't set tail_mp to * cont_mp, because the next * time we access it, we'll * dereference a NULL pointer * and crash. Just leave * tail_mp pointing at the * current end of chain. */ if (cont_mp) tail_mp = cont_mp; } else { mblk_t *smp = head_mp; while ((smp->b_cont != NULL) && (smp->b_cont != mp)) smp = smp->b_cont; smp->b_cont = cont_mp; /* * Don't set tail_mp to cont_mp * if it's NULL. Instead, set * tail_mp to smp, which is the * end of the chain starting * at head_mp. */ if (cont_mp) tail_mp = cont_mp; else tail_mp = smp; } } freeb(mp); } continue; } /* * frag_len starts at -4 and is incremented past the record * marking header to 0, and then becomes positive as real data * bytes are received for the message. While frag_len is less * than zero, we need more bytes for the record marking * header. */ if (frag_len < 0) { uchar_t *up = rptr; /* * Collect as many bytes as we need for the record * marking header and that are available in this block. */ do { --len; frag_len++; frag_header <<= 8; frag_header += (*up++ & 0xFF); } while (len > 0 && frag_len < 0); if (rptr == mp->b_rptr) { /* * The record header is located at the * beginning of the block, so just walk * b_rptr past it. */ mp->b_rptr = rptr = up; } else { /* * The record header is located in the middle * of a block, so copy any remaining data up. * This happens when an RPC message is * fragmented into multiple pieces and * a middle (or end) fragment immediately * follows a previous fragment in the same * message block. */ wptr = &rptr[len]; mp->b_wptr = wptr; if (len) { RPCLOG(32, "mir_do_rput: copying %d " "bytes of data up", len); RPCLOG(32, " db_ref %d\n", (uint_t)mp->b_datap->db_ref); bcopy(up, rptr, len); } } /* * If we haven't received the complete record header * yet, then loop around to get the next block in the * STREAMS message. The logic at same_mblk label will * free the current block if it has become empty. */ if (frag_len < 0) { RPCLOG(32, "mir_do_rput: frag_len is still < 0 " "(%d)", len); goto same_mblk; } #ifdef RPCDEBUG if ((frag_header & MIR_LASTFRAG) == 0) { RPCLOG0(32, "mir_do_rput: multi-fragment " "record\n"); } { uint_t l = frag_header & ~MIR_LASTFRAG; if (l != 0 && mir->mir_max_msg_sizep && l >= *mir->mir_max_msg_sizep) { RPCLOG(32, "mir_do_rput: fragment size" " (%d) > maximum", l); RPCLOG(32, " (%u)\n", *mir->mir_max_msg_sizep); } } #endif /* * At this point we have retrieved the complete record * header for this fragment. If the current block is * empty, then we need to free it and walk to the next * block. */ if (mp->b_rptr >= wptr) { /* * If this is not the last fragment or if we * have not received all the data for this * RPC message, then loop around to the next * block. */ if (!(frag_header & MIR_LASTFRAG) || (frag_len - (frag_header & ~MIR_LASTFRAG)) || !head_mp) goto same_mblk; /* * Quick walk to next block in the * STREAMS message. */ freeb(mp); continue; } } /* * We've collected the complete record header. The data * in the current block is added to the end of the RPC * message. Note that tail_mp is the same as mp after * this linkage. */ if (!head_mp) head_mp = mp; else if (tail_mp != mp) { ASSERT((tail_mp->b_cont == NULL) || (tail_mp->b_cont == mp)); tail_mp->b_cont = mp; } tail_mp = mp; /* * Add the length of this block to the accumulated * fragment length. */ frag_len += len; excess = frag_len - (frag_header & ~MIR_LASTFRAG); /* * If we have not received all the data for this fragment, * then walk to the next block. */ if (excess < 0) continue; /* * We've received a complete fragment, so reset frag_len * for the next one. */ frag_len = -(int32_t)sizeof (uint32_t); /* * Update rptr to point to the beginning of the next * fragment in this block. If there are no more bytes * in the block (excess is 0), then rptr will be equal * to wptr. */ rptr = wptr - excess; /* * Now we check to see if this fragment is the last one in * the RPC message. */ if (!(frag_header & MIR_LASTFRAG)) { /* * This isn't the last one, so start processing the * next fragment. */ frag_header = 0; /* * If excess is 0, the next fragment * starts at the beginning of the next block -- * we "continue" to the end of the while loop and * walk to cont_mp. */ if (excess == 0) continue; RPCLOG0(32, "mir_do_rput: multi-fragment message with " "two or more fragments in one mblk\n"); /* * If excess is non-0, then the next fragment starts * in this block. rptr points to the beginning * of the next fragment and we "goto same_mblk" * to continue processing. */ goto same_mblk; } /* * We've got a complete RPC message. Before passing it * upstream, check to see if there is extra data in this * message block. If so, then we separate the excess * from the complete message. The excess data is processed * after the current message goes upstream. */ if (excess > 0) { RPCLOG(32, "mir_do_rput: end of record, but excess " "data (%d bytes) in this mblk. dupb/copyb " "needed\n", excess); /* Duplicate only the overlapping block. */ mp1 = dupb(tail_mp); /* * dupb() might have failed due to ref count wrap around * so try a copyb(). */ if (mp1 == NULL) mp1 = copyb(tail_mp); /* * Do not use bufcall() to schedule a "buffer * availability event." The reason is that * bufcall() has problems. For example, if memory * runs out, bufcall() itself will fail since it * needs to allocate memory. The most appropriate * action right now is to disconnect this connection * as the system is under stress. We should try to * free up resources. */ if (mp1 == NULL) { freemsg(head_mp); RPCLOG0(1, "mir_do_rput: dupb/copyb failed\n"); mir->mir_frag_header = 0; mir->mir_frag_len = -(int)sizeof (uint32_t); mir->mir_head_mp = NULL; mir->mir_tail_mp = NULL; mir_disconnect(q, mir); return; } /* * The new message block is linked with the * continuation block in cont_mp. We then point * cont_mp to the new block so that we will * process it next. */ mp1->b_cont = cont_mp; cont_mp = mp1; /* * Data in the new block begins at the * next fragment (rptr). */ cont_mp->b_rptr += (rptr - tail_mp->b_rptr); ASSERT(cont_mp->b_rptr >= cont_mp->b_datap->db_base); ASSERT(cont_mp->b_rptr <= cont_mp->b_wptr); /* Data in the current fragment ends at rptr. */ tail_mp->b_wptr = rptr; ASSERT(tail_mp->b_wptr <= tail_mp->b_datap->db_lim); ASSERT(tail_mp->b_wptr >= tail_mp->b_rptr); } /* tail_mp is the last block with data for this RPC message. */ tail_mp->b_cont = NULL; /* Pass the RPC message to the current consumer. */ switch (mir->mir_type) { case RPC_CLIENT: if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) { /* * Mark this stream as active. This marker * is used in mir_timer(). */ mir->mir_clntreq = 1; mir->mir_use_timestamp = lbolt; } else freemsg(head_mp); break; case RPC_SERVER: /* * Check for flow control before passing the * message to KRPC. */ if (!mir->mir_hold_inbound) { if (mir->mir_krpc_cell) { /* * If the reference count is 0 * (not including this request), * then the stream is transitioning * from idle to non-idle. In this case, * we cancel the idle timer. */ if (mir->mir_ref_cnt++ == 0) stop_timer = B_TRUE; if (mir_check_len(q, (int32_t)msgdsize(mp), mp)) return; svc_queuereq(q, head_mp); /* to KRPC */ } else { /* * Count # of times this happens. Should be * never, but experience shows otherwise. */ mir_krpc_cell_null++; freemsg(head_mp); } } else { /* * If the outbound side of the stream is * flow controlled, then hold this message * until client catches up. mir_hold_inbound * is set in mir_wput and cleared in mir_wsrv. */ if (srv) (void) putbq(q, head_mp); else (void) putq(q, head_mp); mir->mir_inrservice = B_TRUE; } break; default: RPCLOG(1, "mir_rput: unknown mir_type %d\n", mir->mir_type); freemsg(head_mp); break; } /* * Reset head_mp and frag_header since we're starting on a * new RPC fragment and message. */ head_mp = NULL; tail_mp = NULL; frag_header = 0; } while ((mp = cont_mp) != NULL); /* * Do a sanity check on the message length. If this message is * getting excessively large, shut down the connection. */ if (head_mp != NULL && mir->mir_setup_complete && mir_check_len(q, frag_len, head_mp)) return; /* Save our local copies back in the mir structure. */ mir->mir_frag_header = frag_header; mir->mir_frag_len = frag_len; mir->mir_head_mp = head_mp; mir->mir_tail_mp = tail_mp; /* * The timer is stopped after the whole message chain is processed. * The reason is that stopping the timer releases the mir_mutex * lock temporarily. This means that the request can be serviced * while we are still processing the message chain. This is not * good. So we stop the timer here instead. * * Note that if the timer fires before we stop it, it will not * do any harm as MIR_SVC_QUIESCED() is false and mir_timer() * will just return; */ if (stop_timer) { RPCLOG(16, "mir_do_rput stopping idle timer on 0x%p because " "ref cnt going to non zero\n", (void *) WR(q)); mir_svc_idle_stop(WR(q), mir); } mutex_exit(&mir->mir_mutex); } static void mir_rput(queue_t *q, mblk_t *mp) { mir_do_rput(q, mp, 0); } static void mir_rput_proto(queue_t *q, mblk_t *mp) { mir_t *mir = (mir_t *)q->q_ptr; uint32_t type; uint32_t reason = 0; ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); type = ((union T_primitives *)mp->b_rptr)->type; switch (mir->mir_type) { case RPC_CLIENT: switch (type) { case T_DISCON_IND: reason = ((struct T_discon_ind *)(mp->b_rptr))->DISCON_reason; /*FALLTHROUGH*/ case T_ORDREL_IND: mutex_enter(&mir->mir_mutex); if (mir->mir_head_mp) { freemsg(mir->mir_head_mp); mir->mir_head_mp = (mblk_t *)0; mir->mir_tail_mp = (mblk_t *)0; } /* * We are disconnecting, but not necessarily * closing. By not closing, we will fail to * pick up a possibly changed global timeout value, * unless we store it now. */ mir->mir_idle_timeout = clnt_idle_timeout; mir_clnt_idle_stop(WR(q), mir); /* * Even though we are unconnected, we still * leave the idle timer going on the client. The * reason for is that if we've disconnected due * to a server-side disconnect, reset, or connection * timeout, there is a possibility the client may * retry the RPC request. This retry needs to done on * the same bound address for the server to interpret * it as such. However, we don't want * to wait forever for that possibility. If the * end-point stays unconnected for mir_idle_timeout * units of time, then that is a signal to the * connection manager to give up waiting for the * application (eg. NFS) to send a retry. */ mir_clnt_idle_start(WR(q), mir); mutex_exit(&mir->mir_mutex); clnt_dispatch_notifyall(WR(q), type, reason); freemsg(mp); return; case T_ERROR_ACK: { struct T_error_ack *terror; terror = (struct T_error_ack *)mp->b_rptr; RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p", (void *)q); RPCLOG(1, " ERROR_prim: %s,", rpc_tpiprim2name(terror->ERROR_prim)); RPCLOG(1, " TLI_error: %s,", rpc_tpierr2name(terror->TLI_error)); RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error); if (terror->ERROR_prim == T_DISCON_REQ) { clnt_dispatch_notifyall(WR(q), type, reason); freemsg(mp); return; } else { if (clnt_dispatch_notifyconn(WR(q), mp)) return; } break; } case T_OK_ACK: { struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr; if (tok->CORRECT_prim == T_DISCON_REQ) { clnt_dispatch_notifyall(WR(q), type, reason); freemsg(mp); return; } else { if (clnt_dispatch_notifyconn(WR(q), mp)) return; } break; } case T_CONN_CON: case T_INFO_ACK: case T_OPTMGMT_ACK: if (clnt_dispatch_notifyconn(WR(q), mp)) return; break; case T_BIND_ACK: break; default: RPCLOG(1, "mir_rput: unexpected message %d " "for KRPC client\n", ((union T_primitives *)mp->b_rptr)->type); break; } break; case RPC_SERVER: switch (type) { case T_BIND_ACK: { struct T_bind_ack *tbind; /* * If this is a listening stream, then shut * off the idle timer. */ tbind = (struct T_bind_ack *)mp->b_rptr; if (tbind->CONIND_number > 0) { mutex_enter(&mir->mir_mutex); mir_svc_idle_stop(WR(q), mir); /* * mark this as a listen endpoint * for special handling. */ mir->mir_listen_stream = 1; mutex_exit(&mir->mir_mutex); } break; } case T_DISCON_IND: case T_ORDREL_IND: RPCLOG(16, "mir_rput_proto: got %s indication\n", type == T_DISCON_IND ? "disconnect" : "orderly release"); /* * For listen endpoint just pass * on the message. */ if (mir->mir_listen_stream) break; mutex_enter(&mir->mir_mutex); /* * If client wants to break off connection, record * that fact. */ mir_svc_start_close(WR(q), mir); /* * If we are idle, then send the orderly release * or disconnect indication to nfsd. */ if (MIR_SVC_QUIESCED(mir)) { mutex_exit(&mir->mir_mutex); break; } RPCLOG(16, "mir_rput_proto: not idle, so " "disconnect/ord rel indication not passed " "upstream on 0x%p\n", (void *)q); /* * Hold the indication until we get idle * If there already is an indication stored, * replace it if the new one is a disconnect. The * reasoning is that disconnection takes less time * to process, and once a client decides to * disconnect, we should do that. */ if (mir->mir_svc_pend_mp) { if (type == T_DISCON_IND) { RPCLOG(16, "mir_rput_proto: replacing" " held disconnect/ord rel" " indication with disconnect on" " 0x%p\n", (void *)q); freemsg(mir->mir_svc_pend_mp); mir->mir_svc_pend_mp = mp; } else { RPCLOG(16, "mir_rput_proto: already " "held a disconnect/ord rel " "indication. freeing ord rel " "ind on 0x%p\n", (void *)q); freemsg(mp); } } else mir->mir_svc_pend_mp = mp; mutex_exit(&mir->mir_mutex); return; default: /* nfsd handles server-side non-data messages. */ break; } break; default: break; } putnext(q, mp); } /* * The server-side read queues are used to hold inbound messages while * outbound flow control is exerted. When outbound flow control is * relieved, mir_wsrv qenables the read-side queue. Read-side queues * are not enabled by STREAMS and are explicitly noenable'ed in mir_open. * * For the server side, we have two types of messages queued. The first type * are messages that are ready to be XDR decoded and and then sent to the * RPC program's dispatch routine. The second type are "raw" messages that * haven't been processed, i.e. assembled from rpc record fragements into * full requests. The only time we will see the second type of message * queued is if we have a memory allocation failure while processing a * a raw message. The field mir_first_non_processed_mblk will mark the * first such raw message. So the flow for server side is: * * - send processed queued messages to kRPC until we run out or find * one that needs additional processing because we were short on memory * earlier * - process a message that was deferred because of lack of * memory * - continue processing messages until the queue empties or we * have to stop because of lack of memory * - during each of the above phase, if the queue is empty and * there are no pending messages that were passed to the RPC * layer, send upstream the pending disconnect/ordrel indication if * there is one * * The read-side queue is also enabled by a bufcall callback if dupmsg * fails in mir_rput. */ static void mir_rsrv(queue_t *q) { mir_t *mir; mblk_t *mp; mblk_t *cmp = NULL; boolean_t stop_timer = B_FALSE; mir = (mir_t *)q->q_ptr; mutex_enter(&mir->mir_mutex); mp = NULL; switch (mir->mir_type) { case RPC_SERVER: if (mir->mir_ref_cnt == 0) mir->mir_hold_inbound = 0; if (mir->mir_hold_inbound) { ASSERT(cmp == NULL); if (q->q_first == NULL) { MIR_CLEAR_INRSRV(mir); if (MIR_SVC_QUIESCED(mir)) { cmp = mir->mir_svc_pend_mp; mir->mir_svc_pend_mp = NULL; } } mutex_exit(&mir->mir_mutex); if (cmp != NULL) { RPCLOG(16, "mir_rsrv: line %d: sending a held " "disconnect/ord rel indication upstream\n", __LINE__); putnext(q, cmp); } return; } while (mp = getq(q)) { if (mir->mir_krpc_cell) { /* * If we were idle, turn off idle timer since * we aren't idle any more. */ if (mir->mir_ref_cnt++ == 0) stop_timer = B_TRUE; if (mir_check_len(q, (int32_t)msgdsize(mp), mp)) return; svc_queuereq(q, mp); } else { /* * Count # of times this happens. Should be * never, but experience shows otherwise. */ mir_krpc_cell_null++; freemsg(mp); } } break; case RPC_CLIENT: break; default: RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type); if (q->q_first == NULL) MIR_CLEAR_INRSRV(mir); mutex_exit(&mir->mir_mutex); return; } /* * The timer is stopped after all the messages are processed. * The reason is that stopping the timer releases the mir_mutex * lock temporarily. This means that the request can be serviced * while we are still processing the message queue. This is not * good. So we stop the timer here instead. */ if (stop_timer) { RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref " "cnt going to non zero\n", (void *)WR(q)); mir_svc_idle_stop(WR(q), mir); } if (q->q_first == NULL) { MIR_CLEAR_INRSRV(mir); ASSERT(cmp == NULL); if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) { cmp = mir->mir_svc_pend_mp; mir->mir_svc_pend_mp = NULL; } mutex_exit(&mir->mir_mutex); if (cmp != NULL) { RPCLOG(16, "mir_rsrv: line %d: sending a held " "disconnect/ord rel indication upstream\n", __LINE__); putnext(q, cmp); } return; } mutex_exit(&mir->mir_mutex); } static int mir_svc_policy_fails; /* * Called to send an event code to nfsd/lockd so that it initiates * connection close. */ static int mir_svc_policy_notify(queue_t *q, int event) { mblk_t *mp; #ifdef DEBUG mir_t *mir = (mir_t *)q->q_ptr; ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); #endif ASSERT(q->q_flag & QREADR); /* * Create an M_DATA message with the event code and pass it to the * Stream head (nfsd or whoever created the stream will consume it). */ mp = allocb(sizeof (int), BPRI_HI); if (!mp) { mir_svc_policy_fails++; RPCLOG(16, "mir_svc_policy_notify: could not allocate event " "%d\n", event); return (ENOMEM); } U32_TO_BE32(event, mp->b_rptr); mp->b_wptr = mp->b_rptr + sizeof (int); putnext(q, mp); return (0); } /* * Server side: start the close phase. We want to get this rpcmod slot in an * idle state before mir_close() is called. */ static void mir_svc_start_close(queue_t *wq, mir_t *mir) { ASSERT(MUTEX_HELD(&mir->mir_mutex)); ASSERT((wq->q_flag & QREADR) == 0); ASSERT(mir->mir_type == RPC_SERVER); /* * Do not accept any more messages. */ mir->mir_svc_no_more_msgs = 1; /* * Next two statements will make the read service procedure invoke * svc_queuereq() on everything stuck in the streams read queue. * It's not necessary because enabling the write queue will * have the same effect, but why not speed the process along? */ mir->mir_hold_inbound = 0; qenable(RD(wq)); /* * Meanwhile force the write service procedure to send the * responses downstream, regardless of flow control. */ qenable(wq); } /* * This routine is called directly by KRPC after a request is completed, * whether a reply was sent or the request was dropped. */ static void mir_svc_release(queue_t *wq, mblk_t *mp) { mir_t *mir = (mir_t *)wq->q_ptr; mblk_t *cmp = NULL; ASSERT((wq->q_flag & QREADR) == 0); if (mp) freemsg(mp); mutex_enter(&mir->mir_mutex); /* * Start idle processing if this is the last reference. */ if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) { RPCLOG(16, "mir_svc_release starting idle timer on 0x%p " "because ref cnt is zero\n", (void *) wq); cmp = mir->mir_svc_pend_mp; mir->mir_svc_pend_mp = NULL; mir_svc_idle_start(wq, mir); } mir->mir_ref_cnt--; ASSERT(mir->mir_ref_cnt >= 0); /* * Wake up the thread waiting to close. */ if ((mir->mir_ref_cnt == 0) && mir->mir_closing) cv_signal(&mir->mir_condvar); mutex_exit(&mir->mir_mutex); if (cmp) { RPCLOG(16, "mir_svc_release: sending a held " "disconnect/ord rel indication upstream on queue 0x%p\n", (void *)RD(wq)); putnext(RD(wq), cmp); } } /* * This routine is called by server-side KRPC when it is ready to * handle inbound messages on the stream. */ static void mir_svc_start(queue_t *wq) { mir_t *mir = (mir_t *)wq->q_ptr; /* * no longer need to take the mir_mutex because the * mir_setup_complete field has been moved out of * the binary field protected by the mir_mutex. */ mir->mir_setup_complete = 1; qenable(RD(wq)); } /* * client side wrapper for stopping timer with normal idle timeout. */ static void mir_clnt_idle_stop(queue_t *wq, mir_t *mir) { ASSERT(MUTEX_HELD(&mir->mir_mutex)); ASSERT((wq->q_flag & QREADR) == 0); ASSERT(mir->mir_type == RPC_CLIENT); mir_timer_stop(mir); } /* * client side wrapper for stopping timer with normal idle timeout. */ static void mir_clnt_idle_start(queue_t *wq, mir_t *mir) { ASSERT(MUTEX_HELD(&mir->mir_mutex)); ASSERT((wq->q_flag & QREADR) == 0); ASSERT(mir->mir_type == RPC_CLIENT); mir_timer_start(wq, mir, mir->mir_idle_timeout); } /* * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on * end-points that aren't connected. */ static void mir_clnt_idle_do_stop(queue_t *wq) { mir_t *mir = (mir_t *)wq->q_ptr; RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq); ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); mutex_enter(&mir->mir_mutex); mir_clnt_idle_stop(wq, mir); mutex_exit(&mir->mir_mutex); } /* * Timer handler. It handles idle timeout and memory shortage problem. */ static void mir_timer(void *arg) { queue_t *wq = (queue_t *)arg; mir_t *mir = (mir_t *)wq->q_ptr; boolean_t notify; mutex_enter(&mir->mir_mutex); /* * mir_timer_call is set only when either mir_timer_[start|stop] * is progressing. And mir_timer() can only be run while they * are progressing if the timer is being stopped. So just * return. */ if (mir->mir_timer_call) { mutex_exit(&mir->mir_mutex); return; } mir->mir_timer_id = 0; switch (mir->mir_type) { case RPC_CLIENT: /* * For clients, the timer fires at clnt_idle_timeout * intervals. If the activity marker (mir_clntreq) is * zero, then the stream has been idle since the last * timer event and we notify KRPC. If mir_clntreq is * non-zero, then the stream is active and we just * restart the timer for another interval. mir_clntreq * is set to 1 in mir_wput for every request passed * downstream. * * If this was a memory shortage timer reset the idle * timeout regardless; the mir_clntreq will not be a * valid indicator. * * The timer is initially started in mir_wput during * RPC_CLIENT ioctl processing. * * The timer interval can be changed for individual * streams with the ND variable "mir_idle_timeout". */ if (mir->mir_clntreq > 0 && mir->mir_use_timestamp + MSEC_TO_TICK(mir->mir_idle_timeout) - lbolt >= 0) { clock_t tout; tout = mir->mir_idle_timeout - TICK_TO_MSEC(lbolt - mir->mir_use_timestamp); if (tout < 0) tout = 1000; #if 0 printf("mir_timer[%d < %d + %d]: reset client timer to %d (ms)\n", TICK_TO_MSEC(lbolt), TICK_TO_MSEC(mir->mir_use_timestamp), mir->mir_idle_timeout, tout); #endif mir->mir_clntreq = 0; mir_timer_start(wq, mir, tout); mutex_exit(&mir->mir_mutex); return; } #if 0 printf("mir_timer[%d]: doing client timeout\n", lbolt / hz); #endif /* * We are disconnecting, but not necessarily * closing. By not closing, we will fail to * pick up a possibly changed global timeout value, * unless we store it now. */ mir->mir_idle_timeout = clnt_idle_timeout; mir_clnt_idle_start(wq, mir); mutex_exit(&mir->mir_mutex); /* * We pass T_ORDREL_REQ as an integer value * to KRPC as the indication that the stream * is idle. This is not a T_ORDREL_REQ message, * it is just a convenient value since we call * the same KRPC routine for T_ORDREL_INDs and * T_DISCON_INDs. */ clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0); return; case RPC_SERVER: /* * For servers, the timer is only running when the stream * is really idle or memory is short. The timer is started * by mir_wput when mir_type is set to RPC_SERVER and * by mir_svc_idle_start whenever the stream goes idle * (mir_ref_cnt == 0). The timer is cancelled in * mir_rput whenever a new inbound request is passed to KRPC * and the stream was previously idle. * * The timer interval can be changed for individual * streams with the ND variable "mir_idle_timeout". * * If the stream is not idle do nothing. */ if (!MIR_SVC_QUIESCED(mir)) { mutex_exit(&mir->mir_mutex); return; } notify = !mir->mir_inrservice; mutex_exit(&mir->mir_mutex); /* * If there is no packet queued up in read queue, the stream * is really idle so notify nfsd to close it. */ if (notify) { RPCLOG(16, "mir_timer: telling stream head listener " "to close stream (0x%p)\n", (void *) RD(wq)); (void) mir_svc_policy_notify(RD(wq), 1); } return; default: RPCLOG(1, "mir_timer: unexpected mir_type %d\n", mir->mir_type); mutex_exit(&mir->mir_mutex); return; } } /* * Called by the RPC package to send either a call or a return, or a * transport connection request. Adds the record marking header. */ static void mir_wput(queue_t *q, mblk_t *mp) { uint_t frag_header; mir_t *mir = (mir_t *)q->q_ptr; uchar_t *rptr = mp->b_rptr; if (!mir) { freemsg(mp); return; } if (mp->b_datap->db_type != M_DATA) { mir_wput_other(q, mp); return; } if (mir->mir_ordrel_pending == 1) { freemsg(mp); RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n", (void *)q); return; } frag_header = (uint_t)DLEN(mp); frag_header |= MIR_LASTFRAG; /* Stick in the 4 byte record marking header. */ if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) || !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) { /* * Since we know that M_DATA messages are created exclusively * by KRPC, we expect that KRPC will leave room for our header * and 4 byte align which is normal for XDR. * If KRPC (or someone else) does not cooperate, then we * just throw away the message. */ RPCLOG(1, "mir_wput: KRPC did not leave space for record " "fragment header (%d bytes left)\n", (int)(rptr - mp->b_datap->db_base)); freemsg(mp); return; } rptr -= sizeof (uint32_t); *(uint32_t *)rptr = htonl(frag_header); mp->b_rptr = rptr; mutex_enter(&mir->mir_mutex); if (mir->mir_type == RPC_CLIENT) { /* * For the client, set mir_clntreq to indicate that the * connection is active. */ mir->mir_clntreq = 1; mir->mir_use_timestamp = lbolt; } /* * If we haven't already queued some data and the downstream module * can accept more data, send it on, otherwise we queue the message * and take other actions depending on mir_type. */ if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) { mutex_exit(&mir->mir_mutex); /* * Now we pass the RPC message downstream. */ putnext(q, mp); return; } switch (mir->mir_type) { case RPC_CLIENT: /* * Check for a previous duplicate request on the * queue. If there is one, then we throw away * the current message and let the previous one * go through. If we can't find a duplicate, then * send this one. This tap dance is an effort * to reduce traffic and processing requirements * under load conditions. */ if (mir_clnt_dup_request(q, mp)) { mutex_exit(&mir->mir_mutex); freemsg(mp); return; } break; case RPC_SERVER: /* * Set mir_hold_inbound so that new inbound RPC * messages will be held until the client catches * up on the earlier replies. This flag is cleared * in mir_wsrv after flow control is relieved; * the read-side queue is also enabled at that time. */ mir->mir_hold_inbound = 1; break; default: RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type); break; } mir->mir_inwservice = 1; (void) putq(q, mp); mutex_exit(&mir->mir_mutex); } static void mir_wput_other(queue_t *q, mblk_t *mp) { mir_t *mir = (mir_t *)q->q_ptr; struct iocblk *iocp; uchar_t *rptr = mp->b_rptr; bool_t flush_in_svc = FALSE; ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex)); switch (mp->b_datap->db_type) { case M_IOCTL: iocp = (struct iocblk *)rptr; switch (iocp->ioc_cmd) { case RPC_CLIENT: mutex_enter(&mir->mir_mutex); if (mir->mir_type != 0 && mir->mir_type != iocp->ioc_cmd) { ioc_eperm: mutex_exit(&mir->mir_mutex); iocp->ioc_error = EPERM; iocp->ioc_count = 0; mp->b_datap->db_type = M_IOCACK; qreply(q, mp); return; } mir->mir_type = iocp->ioc_cmd; /* * Clear mir_hold_inbound which was set to 1 by * mir_open. This flag is not used on client * streams. */ mir->mir_hold_inbound = 0; mir->mir_max_msg_sizep = &clnt_max_msg_size; /* * Start the idle timer. See mir_timer() for more * information on how client timers work. */ mir->mir_idle_timeout = clnt_idle_timeout; mir_clnt_idle_start(q, mir); mutex_exit(&mir->mir_mutex); mp->b_datap->db_type = M_IOCACK; qreply(q, mp); return; case RPC_SERVER: mutex_enter(&mir->mir_mutex); if (mir->mir_type != 0 && mir->mir_type != iocp->ioc_cmd) goto ioc_eperm; /* * We don't clear mir_hold_inbound here because * mir_hold_inbound is used in the flow control * model. If we cleared it here, then we'd commit * a small violation to the model where the transport * might immediately block downstream flow. */ mir->mir_type = iocp->ioc_cmd; mir->mir_max_msg_sizep = &svc_max_msg_size; /* * Start the idle timer. See mir_timer() for more * information on how server timers work. * * Note that it is important to start the idle timer * here so that connections time out even if we * never receive any data on them. */ mir->mir_idle_timeout = svc_idle_timeout; RPCLOG(16, "mir_wput_other starting idle timer on 0x%p " "because we got RPC_SERVER ioctl\n", (void *)q); mir_svc_idle_start(q, mir); mutex_exit(&mir->mir_mutex); mp->b_datap->db_type = M_IOCACK; qreply(q, mp); return; default: break; } break; case M_PROTO: if (mir->mir_type == RPC_CLIENT) { /* * We are likely being called from the context of a * service procedure. So we need to enqueue. However * enqueing may put our message behind data messages. * So flush the data first. */ flush_in_svc = TRUE; } if ((mp->b_wptr - rptr) < sizeof (uint32_t) || !IS_P2ALIGNED(rptr, sizeof (uint32_t))) break; switch (((union T_primitives *)rptr)->type) { case T_DATA_REQ: /* Don't pass T_DATA_REQ messages downstream. */ freemsg(mp); return; case T_ORDREL_REQ: RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n", (void *)q); mutex_enter(&mir->mir_mutex); if (mir->mir_type != RPC_SERVER) { /* * We are likely being called from * clnt_dispatch_notifyall(). Sending * a T_ORDREL_REQ will result in * a some kind of _IND message being sent, * will be another call to * clnt_dispatch_notifyall(). To keep the stack * lean, queue this message. */ mir->mir_inwservice = 1; (void) putq(q, mp); mutex_exit(&mir->mir_mutex); return; } /* * Mark the structure such that we don't accept any * more requests from client. We could defer this * until we actually send the orderly release * request downstream, but all that does is delay * the closing of this stream. */ RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ " " so calling mir_svc_start_close\n", (void *)q); mir_svc_start_close(q, mir); /* * If we have sent down a T_ORDREL_REQ, don't send * any more. */ if (mir->mir_ordrel_pending) { freemsg(mp); mutex_exit(&mir->mir_mutex); return; } /* * If the stream is not idle, then we hold the * orderly release until it becomes idle. This * ensures that KRPC will be able to reply to * all requests that we have passed to it. * * We also queue the request if there is data already * queued, because we cannot allow the T_ORDREL_REQ * to go before data. When we had a separate reply * count, this was not a problem, because the * reply count was reconciled when mir_wsrv() * completed. */ if (!MIR_SVC_QUIESCED(mir) || mir->mir_inwservice == 1) { mir->mir_inwservice = 1; (void) putq(q, mp); RPCLOG(16, "mir_wput_other: queuing " "T_ORDREL_REQ on 0x%p\n", (void *)q); mutex_exit(&mir->mir_mutex); return; } /* * Mark the structure so that we know we sent * an orderly release request, and reset the idle timer. */ mir->mir_ordrel_pending = 1; RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start" " on 0x%p because we got T_ORDREL_REQ\n", (void *)q); mir_svc_idle_start(q, mir); mutex_exit(&mir->mir_mutex); /* * When we break, we will putnext the T_ORDREL_REQ. */ break; case T_CONN_REQ: mutex_enter(&mir->mir_mutex); if (mir->mir_head_mp != NULL) { freemsg(mir->mir_head_mp); mir->mir_head_mp = NULL; mir->mir_tail_mp = NULL; } mir->mir_frag_len = -(int32_t)sizeof (uint32_t); /* * Restart timer in case mir_clnt_idle_do_stop() was * called. */ mir->mir_idle_timeout = clnt_idle_timeout; mir_clnt_idle_stop(q, mir); mir_clnt_idle_start(q, mir); mutex_exit(&mir->mir_mutex); break; default: /* * T_DISCON_REQ is one of the interesting default * cases here. Ideally, an M_FLUSH is done before * T_DISCON_REQ is done. However, that is somewhat * cumbersome for clnt_cots.c to do. So we queue * T_DISCON_REQ, and let the service procedure * flush all M_DATA. */ break; } /* fallthru */; default: if (mp->b_datap->db_type >= QPCTL) { if (mp->b_datap->db_type == M_FLUSH) { if (mir->mir_type == RPC_CLIENT && *mp->b_rptr & FLUSHW) { RPCLOG(32, "mir_wput_other: flushing " "wq 0x%p\n", (void *)q); if (*mp->b_rptr & FLUSHBAND) { flushband(q, *(mp->b_rptr + 1), FLUSHDATA); } else { flushq(q, FLUSHDATA); } } else { RPCLOG(32, "mir_wput_other: ignoring " "M_FLUSH on wq 0x%p\n", (void *)q); } } break; } mutex_enter(&mir->mir_mutex); if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) { mutex_exit(&mir->mir_mutex); break; } mir->mir_inwservice = 1; mir->mir_inwflushdata = flush_in_svc; (void) putq(q, mp); mutex_exit(&mir->mir_mutex); qenable(q); return; } putnext(q, mp); } static void mir_wsrv(queue_t *q) { mblk_t *mp; mir_t *mir; bool_t flushdata; mir = (mir_t *)q->q_ptr; mutex_enter(&mir->mir_mutex); flushdata = mir->mir_inwflushdata; mir->mir_inwflushdata = 0; while (mp = getq(q)) { if (mp->b_datap->db_type == M_DATA) { /* * Do not send any more data if we have sent * a T_ORDREL_REQ. */ if (flushdata || mir->mir_ordrel_pending == 1) { freemsg(mp); continue; } /* * Make sure that the stream can really handle more * data. */ if (!MIR_WCANPUTNEXT(mir, q)) { (void) putbq(q, mp); mutex_exit(&mir->mir_mutex); return; } /* * Now we pass the RPC message downstream. */ mutex_exit(&mir->mir_mutex); putnext(q, mp); mutex_enter(&mir->mir_mutex); continue; } /* * This is not an RPC message, pass it downstream * (ignoring flow control) if the server side is not sending a * T_ORDREL_REQ downstream. */ if (mir->mir_type != RPC_SERVER || ((union T_primitives *)mp->b_rptr)->type != T_ORDREL_REQ) { mutex_exit(&mir->mir_mutex); putnext(q, mp); mutex_enter(&mir->mir_mutex); continue; } if (mir->mir_ordrel_pending == 1) { /* * Don't send two T_ORDRELs */ freemsg(mp); continue; } /* * Mark the structure so that we know we sent an orderly * release request. We will check to see slot is idle at the * end of this routine, and if so, reset the idle timer to * handle orderly release timeouts. */ mir->mir_ordrel_pending = 1; RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n", (void *)q); /* * Send the orderly release downstream. If there are other * pending replies we won't be able to send them. However, * the only reason we should send the orderly release is if * we were idle, or if an unusual event occurred. */ mutex_exit(&mir->mir_mutex); putnext(q, mp); mutex_enter(&mir->mir_mutex); } if (q->q_first == NULL) /* * If we call mir_svc_idle_start() below, then * clearing mir_inwservice here will also result in * any thread waiting in mir_close() to be signaled. */ mir->mir_inwservice = 0; if (mir->mir_type != RPC_SERVER) { mutex_exit(&mir->mir_mutex); return; } /* * If idle we call mir_svc_idle_start to start the timer (or wakeup * a close). Also make sure not to start the idle timer on the * listener stream. This can cause nfsd to send an orderly release * command on the listener stream. */ if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) { RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p " "because mir slot is idle\n", (void *)q); mir_svc_idle_start(q, mir); } /* * If outbound flow control has been relieved, then allow new * inbound requests to be processed. */ if (mir->mir_hold_inbound) { mir->mir_hold_inbound = 0; qenable(RD(q)); } mutex_exit(&mir->mir_mutex); } static void mir_disconnect(queue_t *q, mir_t *mir) { ASSERT(MUTEX_HELD(&mir->mir_mutex)); switch (mir->mir_type) { case RPC_CLIENT: /* * We are disconnecting, but not necessarily * closing. By not closing, we will fail to * pick up a possibly changed global timeout value, * unless we store it now. */ mir->mir_idle_timeout = clnt_idle_timeout; mir_clnt_idle_start(WR(q), mir); mutex_exit(&mir->mir_mutex); /* * T_DISCON_REQ is passed to KRPC as an integer value * (this is not a TPI message). It is used as a * convenient value to indicate a sanity check * failure -- the same KRPC routine is also called * for T_DISCON_INDs and T_ORDREL_INDs. */ clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0); break; case RPC_SERVER: mir->mir_svc_no_more_msgs = 1; mir_svc_idle_stop(WR(q), mir); mutex_exit(&mir->mir_mutex); RPCLOG(16, "mir_disconnect: telling " "stream head listener to disconnect stream " "(0x%p)\n", (void *) q); (void) mir_svc_policy_notify(q, 2); break; default: mutex_exit(&mir->mir_mutex); break; } } /* * do a sanity check on the length of the fragment. * returns 1 if bad else 0. */ static int mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp) { mir_t *mir; mir = (mir_t *)q->q_ptr; /* * Do a sanity check on the message length. If this message is * getting excessively large, shut down the connection. */ if ((frag_len <= 0) || (mir->mir_max_msg_sizep == NULL) || (frag_len <= *mir->mir_max_msg_sizep)) { return (0); } freemsg(head_mp); mir->mir_head_mp = (mblk_t *)0; mir->mir_frag_len = -(int)sizeof (uint32_t); if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) { cmn_err(CE_NOTE, "KRPC: record fragment from %s of size(%d) exceeds " "maximum (%u). Disconnecting", (mir->mir_type == RPC_CLIENT) ? "server" : (mir->mir_type == RPC_SERVER) ? "client" : "test tool", frag_len, *mir->mir_max_msg_sizep); } mir_disconnect(q, mir); return (1); }