/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2009 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ /* Network data replicator Client side */ #include #include #include #include #include #include #include #include #ifdef _SunOS_2_6 /* * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we * define enum_t here as it is all we need from rpc/types.h * anyway and make it look like we included it. Yuck. */ #define _RPC_TYPES_H typedef int enum_t; #else #ifndef DS_DDICT #include #endif #endif /* _SunOS_2_6 */ #ifndef DS_DDICT #include #include #include #endif #include #include #ifdef DS_DDICT #include #endif #include #include /* dtrace is S10 or later */ #include "rdc_io.h" #include "rdc_clnt.h" #include "rdc_bitmap.h" #include "rdc_diskq.h" kmutex_t rdc_clnt_lock; #ifdef DEBUG int noflush = 0; #endif int rdc_rpc_tmout = RDC_CLNT_TMOUT; static void rdc_clnt_free(struct chtab *, CLIENT *); static void _rdc_remote_flush(rdc_aio_t *); void rdc_flush_memq(int index); void rdc_flush_diskq(int index); int rdc_drain_net_queue(int index); void rdc_flusher_thread(int index); int rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *); void rdc_init_diskq_header(rdc_group_t *grp, dqheader *hd); void rdc_dump_iohdrs(disk_queue *dq); rdc_aio_t *rdc_dequeue(rdc_k_info_t *krdc, int *rc); void rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_off_t qpos); void rdc_close_diskq(rdc_group_t *krdc); int rdc_writer(int index); static struct chtab *rdc_chtable = NULL; static int rdc_clnt_toomany; #ifdef DEBUG static int rdc_ooreply; #endif extern void rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag); extern int _rdc_rsrv_diskq(rdc_group_t *group); extern void _rdc_rlse_diskq(rdc_group_t *group); static enum clnt_stat cl_call_sig(struct __client *rh, rpcproc_t proc, xdrproc_t xargs, caddr_t argsp, xdrproc_t xres, caddr_t resp, struct timeval secs) { enum clnt_stat stat; k_sigset_t smask; sigintr(&smask, 0); rh->cl_nosignal = TRUE; stat = ((*(rh)->cl_ops->cl_call)\ (rh, proc, xargs, argsp, xres, resp, secs)); rh->cl_nosignal = FALSE; sigunintr(&smask); return (stat); } int rdc_net_getsize(int index, uint64_t *sizeptr) { struct timeval t; int err, size; rdc_k_info_t *krdc = &rdc_k_info[index]; int remote_index = krdc->remote_index; *sizeptr = 0; if (krdc->remote_index < 0) return (EINVAL); t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; #ifdef DEBUG if (krdc->intf == NULL) cmn_err(CE_WARN, "!rdc_net_getsize: null intf for index %d", index); #endif if (krdc->rpc_version <= RDC_VERSION5) { err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE, krdc->rpc_version, xdr_int, (char *)&remote_index, xdr_int, (char *)&size, &t); if (err == 0) *sizeptr = size; } else { err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE6, krdc->rpc_version, xdr_int, (char *)&remote_index, xdr_u_longlong_t, (char *)sizeptr, &t); } return (err); } int rdc_net_state(int index, int options) { struct timeval t; int err; int remote_index = -1; rdc_u_info_t *urdc = &rdc_u_info[index]; rdc_k_info_t *krdc = &rdc_k_info[index]; struct set_state s; struct set_state4 s4; char neta[32], rneta[32]; unsigned short *sp; t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; if (krdc->rpc_version < RDC_VERSION7) { s4.netaddrlen = urdc->primary.addr.len; s4.rnetaddrlen = urdc->secondary.addr.len; bcopy(urdc->primary.addr.buf, s4.netaddr, s4.netaddrlen); bcopy(urdc->secondary.addr.buf, s4.rnetaddr, s4.rnetaddrlen); (void) strncpy(s4.pfile, urdc->primary.file, RDC_MAXNAMLEN); (void) strncpy(s4.sfile, urdc->secondary.file, RDC_MAXNAMLEN); s4.flag = options; err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE, krdc->rpc_version, xdr_set_state4, (char *)&s4, xdr_int, (char *)&remote_index, &t); } else { s.netaddrlen = urdc->primary.addr.len; s.rnetaddrlen = urdc->secondary.addr.len; s.netaddr.buf = neta; s.rnetaddr.buf = rneta; bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen); bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen); s.netaddr.len = urdc->primary.addr.len; s.rnetaddr.len = urdc->secondary.addr.len; s.netaddr.maxlen = urdc->primary.addr.len; s.rnetaddr.maxlen = urdc->secondary.addr.len; sp = (unsigned short *)s.netaddr.buf; *sp = htons(*sp); sp = (unsigned short *)s.rnetaddr.buf; *sp = htons(*sp); s.pfile = urdc->primary.file; s.sfile = urdc->secondary.file; s.flag = options; err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE, krdc->rpc_version, xdr_set_state, (char *)&s, xdr_int, (char *)&remote_index, &t); } if (err) return (-1); else return (remote_index); } /* * rdc_net_getbmap * gets the bitmaps from remote side and or's them with remote bitmap */ int rdc_net_getbmap(int index, int size) { struct timeval t; int err; struct bmap b; struct bmap6 b6; rdc_k_info_t *krdc; krdc = &rdc_k_info[index]; if (krdc->remote_index < 0) return (EINVAL); t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; #ifdef DEBUG if (krdc->intf == NULL) cmn_err(CE_WARN, "!rdc_net_getbmap: null intf for index %d", index); #endif if (krdc->rpc_version <= RDC_VERSION5) { b.cd = krdc->remote_index; b.dual = index; b.size = size; err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP, krdc->rpc_version, xdr_bmap, (char *)&b, xdr_int, (char *)&err, &t); } else { b6.cd = krdc->remote_index; b6.dual = index; b6.size = size; err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP6, krdc->rpc_version, xdr_bmap6, (char *)&b6, xdr_int, (char *)&err, &t); } return (err); } int sndr_proto = 0; /* * return state corresponding to rdc_host */ int rdc_net_getstate(rdc_k_info_t *krdc, int *serial_mode, int *use_mirror, int *mirror_down, int network) { int err; struct timeval t; int state; rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; struct set_state s; #ifdef sparc struct set_state4 s4; #endif char neta[32]; char rneta[32]; unsigned short *sp; char *setp = (char *)&s; xdrproc_t xdr_proc = xdr_set_state; if (krdc->lsrv && (krdc->intf == NULL || krdc->intf->if_down) && network) /* fail fast */ return (-1); s.netaddrlen = urdc->primary.addr.len; s.rnetaddrlen = urdc->secondary.addr.len; s.pfile = urdc->primary.file; s.sfile = urdc->secondary.file; s.netaddr.buf = neta; s.rnetaddr.buf = rneta; bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen); bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen); sp = (unsigned short *) s.netaddr.buf; *sp = htons(*sp); sp = (unsigned short *) s.rnetaddr.buf; *sp = htons(*sp); s.netaddr.len = urdc->primary.addr.len; s.rnetaddr.len = urdc->secondary.addr.len; s.netaddr.maxlen = urdc->primary.addr.maxlen; s.rnetaddr.maxlen = urdc->secondary.addr.maxlen; s.flag = 0; t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; if (sndr_proto) krdc->rpc_version = sndr_proto; else krdc->rpc_version = RDC_VERS_MAX; again: err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSTATE4, krdc->rpc_version, xdr_proc, setp, xdr_int, (char *)&state, &t); if (err == RPC_PROGVERSMISMATCH && (krdc->rpc_version != RDC_VERS_MIN)) { if (krdc->rpc_version-- == RDC_VERSION7) { /* set_state struct changed with v7 of protocol */ #ifdef sparc s4.netaddrlen = urdc->primary.addr.len; s4.rnetaddrlen = urdc->secondary.addr.len; bcopy(urdc->primary.addr.buf, s4.netaddr, s4.netaddrlen); bcopy(urdc->secondary.addr.buf, s4.rnetaddr, s4.rnetaddrlen); (void) strncpy(s4.pfile, urdc->primary.file, RDC_MAXNAMLEN); (void) strncpy(s4.sfile, urdc->secondary.file, RDC_MAXNAMLEN); s4.flag = 0; xdr_proc = xdr_set_state4; setp = (char *)&s4; #else /* x64 can not use protocols < 7 */ return (-1); #endif } goto again; } #ifdef DEBUG cmn_err(CE_NOTE, "!sndr get_state: Protocol ver %d", krdc->rpc_version); #endif if (err) { return (-1); } if (state == -1) return (-1); if (serial_mode) *serial_mode = (state >> 2) & 1; if (use_mirror) *use_mirror = (state >> 1) & 1; if (mirror_down) *mirror_down = state & 1; return (0); } static struct xdr_discrim rdres_discrim[2] = { { (int)RDC_OK, xdr_readok }, { __dontcare__, NULL_xdrproc_t } }; /* * Reply from remote read (client side) */ static bool_t xdr_rdresult(XDR *xdrs, readres *rr) { return (xdr_union(xdrs, (enum_t *)&(rr->rr_status), (caddr_t)&(rr->rr_ok), rdres_discrim, xdr_void)); } static int rdc_rrstatus_decode(int status) { int ret = 0; if (status != RDC_OK) { switch (status) { case RDCERR_NOENT: ret = ENOENT; break; case RDCERR_NOMEM: ret = ENOMEM; break; default: ret = EIO; break; } } return (ret); } int rdc_net_read(int local_index, int remote_index, nsc_buf_t *handle, nsc_off_t fba_pos, nsc_size_t fba_len) { struct rdcrdresult rr; rdc_k_info_t *krdc; rdc_u_info_t *urdc; struct rread list; struct rread6 list6; struct timeval t; uchar_t *sv_addr; nsc_vec_t *vec; int rpc_flag; nsc_size_t sv_len; int err; int ret; nsc_size_t len; nsc_size_t maxfbas; int transflag; if (handle == NULL) return (EINVAL); if (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len)) { #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_read: handle bounds"); #endif return (EINVAL); } krdc = &rdc_k_info[local_index]; urdc = &rdc_u_info[local_index]; maxfbas = MAX_RDC_FBAS; if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) { nsc_buf_t *remote_h = NULL; int reserved = 0; ret = nsc_reserve(krdc->remote_fd, NSC_MULTI); if (RDC_SUCCESS(ret)) { reserved = 1; ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len, NSC_RDBUF, &remote_h); } if (RDC_SUCCESS(ret)) { ret = nsc_copy(remote_h, handle, fba_pos, fba_pos, fba_len); if (RDC_SUCCESS(ret)) { (void) nsc_free_buf(remote_h); nsc_release(krdc->remote_fd); return (0); } } rdc_group_enter(krdc); rdc_set_flags(urdc, RDC_FCAL_FAILED); rdc_group_exit(krdc); if (remote_h) (void) nsc_free_buf(remote_h); if (reserved) nsc_release(krdc->remote_fd); } t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; if (rdc_get_vflags(urdc) & RDC_VOL_FAILED) rpc_flag = RDC_RREAD_FAIL; else rpc_flag = 0; #ifdef DEBUG if (krdc->intf == NULL) cmn_err(CE_WARN, "!rdc_net_read: null intf for index %d", local_index); #endif /* * switch on proto version. */ len = fba_len; /* length (FBAs) still to xfer */ rr.rr_bufsize = 0; /* rpc data buffer length (bytes) */ rr.rr_data = NULL; /* rpc data buffer */ transflag = rpc_flag | RDC_RREAD_START; /* setup rpc */ if (krdc->rpc_version <= RDC_VERSION5) { ASSERT(fba_pos <= INT32_MAX); list.pos = (int)fba_pos; /* fba position of start of chunk */ list.cd = remote_index; /* remote end cd */ /* send setup rpc */ list.flag = transflag; ASSERT(len <= INT32_MAX); list.len = (int)len; /* total fba length */ err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, krdc->rpc_version, xdr_rread, (char *)&list, xdr_int, (char *)&ret, &t); } else { list6.pos = fba_pos; /* fba position of start of chunk */ list6.cd = remote_index; /* remote end cd */ /* send setup rpc */ list6.flag = transflag; /* setup rpc */ ASSERT(len <= INT32_MAX); list6.len = (int)len; /* total fba length */ err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, krdc->rpc_version, xdr_rread6, (char *)&list6, xdr_int, (char *)&ret, &t); } if (err) { #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_read: setup err %d", err); #endif if (err == RPC_INTR) ret = EINTR; else ret = ENOLINK; goto remote_rerror; } if (ret == 0) { /* No valid index from r_net_read */ #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_read: no valid index from r_net_read"); #endif return (ENOBUFS); } transflag = rpc_flag | RDC_RREAD_DATA; if (krdc->rpc_version <= RDC_VERSION5) { list.idx = ret; /* save idx to return to server */ list.flag = transflag; /* move onto to data xfer rpcs */ } else { list6.idx = ret; /* save idx to return to server */ list6.flag = transflag; } /* find starting position in handle */ vec = handle->sb_vec; fba_pos -= handle->sb_pos; for (; fba_pos >= FBA_NUM(vec->sv_len); vec++) fba_pos -= FBA_NUM(vec->sv_len); sv_addr = vec->sv_addr + FBA_SIZE(fba_pos); /* data in vector */ sv_len = vec->sv_len - FBA_SIZE(fba_pos); /* bytes in vector */ while (len) { nsc_size_t translen; if (len > maxfbas) { translen = maxfbas; } else { translen = len; } if (FBA_SIZE(translen) > sv_len) { translen = FBA_NUM(sv_len); } len -= translen; if (len == 0) { /* last data xfer rpc - tell server to cleanup */ transflag |= RDC_RREAD_END; } if (!rr.rr_data || (nsc_size_t)rr.rr_bufsize != FBA_SIZE(translen)) { if (rr.rr_data) kmem_free(rr.rr_data, rr.rr_bufsize); ASSERT(FBA_SIZE(translen) <= INT32_MAX); rr.rr_bufsize = FBA_SIZE(translen); rr.rr_data = kmem_alloc(rr.rr_bufsize, KM_NOSLEEP); } if (!rr.rr_data) { /* error */ #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_read: kmem_alloc failed"); #endif return (ENOMEM); } /* get data from remote end */ #ifdef DEBUG if (krdc->intf == NULL) cmn_err(CE_WARN, "!rdc_net_read: null intf for index %d", local_index); #endif if (krdc->io_kstats) { mutex_enter(krdc->io_kstats->ks_lock); kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); } /*CONSTCOND*/ ASSERT(RDC_MAXDATA <= INT32_MAX); ASSERT(translen <= RDC_MAXDATA); if (krdc->rpc_version <= RDC_VERSION5) { list.len = (int)translen; list.flag = transflag; err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, krdc->rpc_version, xdr_rread, (char *)&list, xdr_rdresult, (char *)&rr, &t); } else { list6.len = (int)translen; list6.flag = transflag; err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, krdc->rpc_version, xdr_rread6, (char *)&list6, xdr_rdresult, (char *)&rr, &t); } if (krdc->io_kstats) { mutex_enter(krdc->io_kstats->ks_lock); kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); } if (err) { #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_read: rpc err %d", err); #endif if (err == RPC_INTR) { ret = EINTR; } else { ret = ENOLINK; } goto remote_rerror; } if (rr.rr_status != RDC_OK) { ret = rdc_rrstatus_decode(rr.rr_status); if (!ret) ret = EIO; goto remote_rerror; } /* copy into handle */ bcopy(rr.rr_data, sv_addr, (size_t)rr.rr_bufsize); /* update counters */ sv_addr += rr.rr_bufsize; if (krdc->rpc_version <= RDC_VERSION5) { list.pos += translen; } else { list6.pos += translen; } if (krdc->io_kstats) { KSTAT_IO_PTR(krdc->io_kstats)->reads++; KSTAT_IO_PTR(krdc->io_kstats)->nread += rr.rr_bufsize; } ASSERT(sv_len <= INT32_MAX); ASSERT(sv_len >= (nsc_size_t)rr.rr_bufsize); sv_len -= rr.rr_bufsize; if (sv_len == 0) { /* goto next vector */ vec++; sv_addr = vec->sv_addr; sv_len = vec->sv_len; } } if (rr.rr_data) kmem_free(rr.rr_data, rr.rr_bufsize); return (0); remote_rerror: if (rr.rr_data) kmem_free(rr.rr_data, rr.rr_bufsize); return (ret ? ret : ENOLINK); } /* * rdc_net_write * Main remote write client side * Handles protocol selection as well as requests for remote allocation * and data transfer * Does local IO for FCAL * caller must clear bitmap on success */ int rdc_net_write(int local_index, int remote_index, nsc_buf_t *handle, nsc_off_t fba_pos, nsc_size_t fba_len, uint_t aseq, int qpos, netwriteres *netres) { rdc_k_info_t *krdc; rdc_u_info_t *urdc; struct timeval t; nsc_vec_t *vec; int sv_len; nsc_off_t fpos; int err; struct netwriteres netret; struct netwriteres *netresptr; struct net_data5 dlist5; struct net_data6 dlist6; int ret; nsc_size_t maxfbas; int transflag; int translen; int transendoblk; char *transptr; int vflags; if (handle == NULL) return (EINVAL); /* if not a diskq buffer */ if ((qpos == -1) && (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len))) { #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_write: handle bounds"); #endif return (EINVAL); } t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; krdc = &rdc_k_info[local_index]; urdc = &rdc_u_info[local_index]; maxfbas = MAX_RDC_FBAS; /* FCAL IO */ if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) { nsc_buf_t *remote_h = NULL; int reserved = 0; ret = nsc_reserve(krdc->remote_fd, NSC_MULTI); if (RDC_SUCCESS(ret)) { reserved = 1; ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len, NSC_WRBUF, &remote_h); } if (RDC_SUCCESS(ret)) { ret = nsc_copy(handle, remote_h, fba_pos, fba_pos, fba_len); if (RDC_SUCCESS(ret)) ret = nsc_write(remote_h, fba_pos, fba_len, 0); if (RDC_SUCCESS(ret)) { (void) nsc_free_buf(remote_h); nsc_release(krdc->remote_fd); return (0); } } rdc_group_enter(krdc); rdc_set_flags(urdc, RDC_FCAL_FAILED); rdc_group_exit(krdc); if (remote_h) (void) nsc_free_buf(remote_h); if (reserved) nsc_release(krdc->remote_fd); } /* * At this point we must decide which protocol we are using and * do the right thing */ netret.vecdata.vecdata_val = NULL; netret.vecdata.vecdata_len = 0; if (netres) { netresptr = netres; } else { netresptr = &netret; } vflags = rdc_get_vflags(urdc); if (vflags & (RDC_VOL_FAILED|RDC_BMP_FAILED)) transflag = RDC_RWRITE_FAIL; else transflag = 0; #ifdef DEBUG if (krdc->intf == NULL) cmn_err(CE_WARN, "!rdc_net_write: null intf for index %d", local_index); #endif vec = handle->sb_vec; /* * find starting position in vector */ if ((qpos == -1) || (handle->sb_user == RDC_NULLBUFREAD)) fpos = fba_pos - handle->sb_pos; else fpos = (qpos + 1) - handle->sb_pos; for (; fpos >= FBA_NUM(vec->sv_len); vec++) fpos -= FBA_NUM(vec->sv_len); sv_len = vec->sv_len - FBA_SIZE(fpos); /* bytes in vector */ transptr = (char *)vec->sv_addr + FBA_SIZE(fpos); if (krdc->rpc_version <= RDC_VERSION5) { dlist5.local_cd = local_index; dlist5.cd = remote_index; ASSERT(fba_len <= INT32_MAX); ASSERT(fba_pos <= INT32_MAX); dlist5.len = (int)fba_len; dlist5.pos = (int)fba_pos; dlist5.idx = -1; /* Starting index */ dlist5.flag = transflag; dlist5.seq = aseq; /* sequence number */ dlist5.sfba = (int)fba_pos; /* starting fba for this xfer */ } else { dlist6.local_cd = local_index; dlist6.cd = remote_index; ASSERT(fba_len <= INT32_MAX); dlist6.len = (int)fba_len; dlist6.qpos = qpos; dlist6.pos = fba_pos; dlist6.idx = -1; /* Starting index */ dlist6.flag = transflag; dlist6.seq = aseq; /* sequence number */ dlist6.sfba = fba_pos; /* starting fba for this xfer */ } transendoblk = 0; while (fba_len) { if (!transptr) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_net_write: walked off end of handle!"); #endif ret = EINVAL; goto remote_error; } if (fba_len > maxfbas) { ASSERT(maxfbas <= INT32_MAX); translen = (int)maxfbas; } else { ASSERT(fba_len <= INT32_MAX); translen = (int)fba_len; } if (FBA_SIZE(translen) > sv_len) { translen = FBA_NUM(sv_len); } fba_len -= translen; if (fba_len == 0) { /* last data xfer - tell server to commit */ transendoblk = 1; } #ifdef DEBUG if (krdc->intf == NULL) cmn_err(CE_WARN, "!rdc_net_write: null intf for index %d", local_index); #endif DTRACE_PROBE(rdc_netwrite_clntcall_start); if (krdc->io_kstats) { mutex_enter(krdc->io_kstats->ks_lock); kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); } if (krdc->rpc_version <= RDC_VERSION5) { ret = 0; dlist5.nfba = translen; dlist5.endoblk = transendoblk; dlist5.data.data_len = FBA_SIZE(translen); dlist5.data.data_val = transptr; err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE5, krdc->rpc_version, xdr_net_data5, (char *)&dlist5, xdr_int, (char *)&ret, &t); if (ret >= 0) { netresptr->result = 0; netresptr->index = ret; } else { netresptr->result = ret; } } else { netresptr->result = 0; dlist6.nfba = translen; dlist6.endoblk = transendoblk; dlist6.data.data_len = FBA_SIZE(translen); dlist6.data.data_val = transptr; err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, krdc->rpc_version, xdr_net_data6, (char *)&dlist6, xdr_netwriteres, (char *)netresptr, &t); } if (krdc->io_kstats) { mutex_enter(krdc->io_kstats->ks_lock); kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); } DTRACE_PROBE(rdc_netwrite_clntcall_end); ret = netresptr->result; if (err) { if (err == RPC_INTR) ret = EINTR; else if (err && ret != EPROTO) ret = ENOLINK; #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_write(5): cd %d err %d ret %d", remote_index, err, ret); #endif goto remote_error; } /* Error from r_net_write5 */ if (netresptr->result < 0) { #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_write: r_net_write(5) " "returned: %d", -netresptr->result); #endif ret = -netresptr->result; if (netret.vecdata.vecdata_val) kmem_free(netret.vecdata.vecdata_val, netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); goto remote_error; } else if (netresptr->index == 0) { #ifdef DEBUG cmn_err(CE_NOTE, "!rdc_net_write: no valid index from " "r_net_write(5)"); #endif ret = ENOBUFS; if (netret.vecdata.vecdata_val) kmem_free(netret.vecdata.vecdata_val, netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); goto remote_error; } if (krdc->rpc_version <= RDC_VERSION5) { dlist5.idx = netresptr->index; dlist5.sfba += dlist5.nfba; } else { dlist6.idx = netresptr->index; dlist6.sfba += dlist6.nfba; } /* update counters */ if (krdc->io_kstats) { KSTAT_IO_PTR(krdc->io_kstats)->writes++; KSTAT_IO_PTR(krdc->io_kstats)->nwritten += FBA_SIZE(translen); } transptr += FBA_SIZE(translen); sv_len -= FBA_SIZE(translen); if (sv_len <= 0) { /* goto next vector */ vec++; transptr = (char *)vec->sv_addr; sv_len = vec->sv_len; } } /* * this can't happen..... */ if (netret.vecdata.vecdata_val) kmem_free(netret.vecdata.vecdata_val, netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); return (0); remote_error: return (ret ? ret : ENOLINK); } void rdc_fixlen(rdc_aio_t *aio) { nsc_vec_t *vecp = aio->qhandle->sb_vec; nsc_size_t len = 0; while (vecp->sv_addr) { len += FBA_NUM(vecp->sv_len); vecp++; } aio->qhandle->sb_len = len; } /* * rdc_dump_alloc_bufs_cd * Dump allocated buffers (rdc_net_hnd's) for the specified cd. * this could be the flusher failing, if so, don't do the delay forever * Returns: 0 (success), EAGAIN (caller needs to try again). */ int rdc_dump_alloc_bufs_cd(int index) { rdc_k_info_t *krdc; rdc_aio_t *aio; net_queue *q; disk_queue *dq; kmutex_t *qlock; krdc = &rdc_k_info[index]; if (!krdc->c_fd) { /* cannot do anything! */ #ifdef DEBUG cmn_err(CE_WARN, "!rdc_dump_alloc_bufs_cd(%d): c_fd NULL", index); #endif return (0); } rdc_dump_dsets(index); dq = &krdc->group->diskq; if (RDC_IS_DISKQ(krdc->group)) { qlock = QLOCK(dq); (void) _rdc_rsrv_diskq(krdc->group); } else { qlock = &krdc->group->ra_queue.net_qlock; } /* * Now dump the async queue anonymous buffers * if we are a diskq, the we are using the diskq mutex. * However, we are flushing from diskq to memory queue * so we now need to grab the memory lock also */ q = &krdc->group->ra_queue; if (RDC_IS_DISKQ(krdc->group)) { mutex_enter(&q->net_qlock); if (q->qfill_sleeping == RDC_QFILL_AWAKE) { int tries = 5; #ifdef DEBUG_DISKQ cmn_err(CE_NOTE, "!dumpalloccd sending diskq->memq flush to sleep"); #endif q->qfflags |= RDC_QFILLSLEEP; mutex_exit(&q->net_qlock); while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--) delay(5); mutex_enter(&q->net_qlock); } } mutex_enter(qlock); while ((q->net_qhead != NULL)) { rdc_k_info_t *tmpkrdc; aio = q->net_qhead; tmpkrdc = &rdc_k_info[aio->index]; if (RDC_IS_DISKQ(krdc->group)) { aio->qhandle->sb_user--; if (aio->qhandle->sb_user == 0) { rdc_fixlen(aio); (void) nsc_free_buf(aio->qhandle); aio->qhandle = NULL; aio->handle = NULL; } } else { if (aio->handle) { (void) nsc_free_buf(aio->handle); aio->handle = NULL; } } if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(krdc->group)) { mutex_enter(tmpkrdc->io_kstats->ks_lock); kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats)); mutex_exit(tmpkrdc->io_kstats->ks_lock); } q->net_qhead = q->net_qhead->next; q->blocks -= aio->len; q->nitems--; RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len); kmem_free(aio, sizeof (*aio)); } q->net_qtail = NULL; if (krdc->group->asyncstall) { krdc->group->asyncdis = 1; cv_broadcast(&krdc->group->asyncqcv); } if (krdc->group->sleepq) { rdc_sleepqdiscard(krdc->group); } krdc->group->seq = RDC_NEWSEQ; krdc->group->seqack = RDC_NEWSEQ; if (RDC_IS_DISKQ(krdc->group)) { rdc_dump_iohdrs(dq); SET_QNXTIO(dq, QHEAD(dq)); SET_QCOALBOUNDS(dq, QHEAD(dq)); } mutex_exit(qlock); if (RDC_IS_DISKQ(krdc->group)) { mutex_exit(&q->net_qlock); _rdc_rlse_diskq(krdc->group); } return (0); } /* * rdc_dump_alloc_bufs * We have an error on the link * Try to dump all of the allocated bufs so we can cleanly recover * and not hang */ void rdc_dump_alloc_bufs(rdc_if_t *ip) { rdc_k_info_t *krdc; int repeat; int index; for (index = 0; index < rdc_max_sets; index++) { do { krdc = &rdc_k_info[index]; repeat = 0; if (krdc->intf == ip) { if (rdc_dump_alloc_bufs_cd(index) == EAGAIN) { repeat = 1; delay(2); } } } while (repeat); } } /* * returns 1 if the the throttle should throttle, 0 if not. */ int _rdc_diskq_isfull(disk_queue *q, long len) { /* ---T----H----N--- */ mutex_enter(QLOCK(q)); if (FITSONQ(q, len + 1)) { mutex_exit(QLOCK(q)); return (0); } mutex_exit(QLOCK(q)); return (1); } void _rdc_async_throttle(rdc_k_info_t *this, long len) { rdc_k_info_t *krdc; rdc_u_info_t *urdc; int print_msg = 1; int tries = RDC_FUTILE_ATTEMPTS; /* * Throttle entries on queue */ /* Need to take the 1-many case into account, checking all sets */ /* ADD HANDY HUERISTIC HERE TO SLOW DOWN IO */ for (krdc = this; /* CSTYLED */; krdc = krdc->many_next) { urdc = &rdc_u_info[krdc->index]; /* * this may be the last set standing in a one to many setup. * we may also be stuck in unintercept, after marking * the volume as not enabled, but have not removed it * from the many list resulting in an endless loop if * we just continue here. Lets jump over this stuff * and check to see if we are the only dude here. */ if (!IS_ENABLED(urdc)) goto thischeck; if (IS_ASYNC(urdc) && RDC_IS_MEMQ(krdc->group)) { net_queue *q = &krdc->group->ra_queue; while ((q->blocks + q->inflbls) > urdc->maxqfbas || (q->nitems + q->inflitems) > urdc->maxqitems) { if (!IS_ENABLED(urdc)) /* disable race */ goto thischeck; if (!krdc->group->rdc_writer) (void) rdc_writer(krdc->index); delay(2); q->throttle_delay++; } } /* do a much more aggressive delay, get disk flush going */ if (IS_ASYNC(urdc) && RDC_IS_DISKQ(krdc->group)) { disk_queue *q = &krdc->group->diskq; while ((!IS_QSTATE(q, RDC_QNOBLOCK)) && (_rdc_diskq_isfull(q, len)) && (!IS_STATE(urdc, RDC_DISKQ_FAILED))) { if (print_msg) { cmn_err(CE_WARN, "!rdc async throttle:" " disk queue %s full", &urdc->disk_queue[0]); print_msg = 0; } if (!IS_ENABLED(urdc)) /* disable race */ goto thischeck; if (!krdc->group->rdc_writer) (void) rdc_writer(krdc->index); delay(10); q->throttle_delay += 10; if (!(tries--) && IS_STATE(urdc, RDC_QUEUING)) { cmn_err(CE_WARN, "!SNDR: disk queue " "%s full & not flushing. giving up", &urdc->disk_queue[0]); cmn_err(CE_WARN, "!SNDR: %s:%s entering" " logging mode", urdc->secondary.intf, urdc->secondary.file); rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG | RDC_NOFAIL); mutex_enter(QLOCK(q)); cv_broadcast(&q->qfullcv); mutex_exit(QLOCK(q)); } } if ((IS_QSTATE(q, RDC_QNOBLOCK)) && _rdc_diskq_isfull(q, len) && !IS_STATE(urdc, RDC_DISKQ_FAILED)) { if (print_msg) { cmn_err(CE_WARN, "!disk queue %s full", &urdc->disk_queue[0]); print_msg = 0; } rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG | RDC_NOFAIL); mutex_enter(QLOCK(q)); cv_broadcast(&q->qfullcv); mutex_exit(QLOCK(q)); } } thischeck: if (krdc->many_next == this) break; } } int rdc_coalesce = 1; static int rdc_joins = 0; int rdc_aio_coalesce(rdc_aio_t *queued, rdc_aio_t *new) { nsc_buf_t *h = NULL; int rc; rdc_k_info_t *krdc; uint_t bitmask; if (rdc_coalesce == 0) return (0); /* don't even try */ if ((queued == NULL) || (queued->handle == NULL) || (new->handle == NULL)) { return (0); /* existing queue is empty */ } if (queued->index != new->index || queued->len + new->len > MAX_RDC_FBAS) { return (0); /* I/O to big */ } if ((queued->pos + queued->len == new->pos) || (new->pos + new->len == queued->pos)) { rc = nsc_alloc_abuf(queued->pos, queued->len + new->len, 0, &h); if (!RDC_SUCCESS(rc)) { if (h != NULL) (void) nsc_free_buf(h); return (0); /* couldn't do coalesce */ } rc = nsc_copy(queued->handle, h, queued->pos, queued->pos, queued->len); if (!RDC_SUCCESS(rc)) { (void) nsc_free_buf(h); return (0); /* couldn't do coalesce */ } rc = nsc_copy(new->handle, h, new->pos, new->pos, new->len); if (!RDC_SUCCESS(rc)) { (void) nsc_free_buf(h); return (0); /* couldn't do coalesce */ } krdc = &rdc_k_info[queued->index]; RDC_SET_BITMASK(queued->pos, queued->len, &bitmask); RDC_CLR_BITMAP(krdc, queued->pos, queued->len, \ bitmask, RDC_BIT_BUMP); RDC_SET_BITMASK(new->pos, new->len, &bitmask); RDC_CLR_BITMAP(krdc, new->pos, new->len, \ bitmask, RDC_BIT_BUMP); (void) nsc_free_buf(queued->handle); (void) nsc_free_buf(new->handle); queued->handle = h; queued->len += new->len; bitmask = 0; /* * bump the ref count back up */ RDC_SET_BITMAP(krdc, queued->pos, queued->len, &bitmask); return (1); /* new I/O succeeds last I/O queued */ } return (0); } int rdc_memq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio) { net_queue *q; rdc_group_t *group; group = krdc->group; q = &group->ra_queue; mutex_enter(&q->net_qlock); if (rdc_aio_coalesce(q->net_qtail, aio)) { rdc_joins++; q->blocks += aio->len; kmem_free(aio, sizeof (*aio)); goto out; } aio->seq = group->seq++; if (group->seq < aio->seq) group->seq = RDC_NEWSEQ + 1; /* skip magics */ if (q->net_qhead == NULL) { /* adding to empty q */ q->net_qhead = q->net_qtail = aio; #ifdef DEBUG if (q->blocks != 0 || q->nitems != 0) { cmn_err(CE_PANIC, "rdc enqueue: q %p, qhead 0, q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT, (void *) q, q->blocks, q->nitems); } #endif } else { /* discontiguous, add aio to q tail */ q->net_qtail->next = aio; q->net_qtail = aio; } q->blocks += aio->len; q->nitems++; if (krdc->io_kstats) { mutex_enter(krdc->io_kstats->ks_lock); kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); } out: #ifdef DEBUG /* sum the q and check for sanity */ { nsc_size_t qblocks = 0; uint64_t nitems = 0; rdc_aio_t *a; for (a = q->net_qhead; a != NULL; a = a->next) { qblocks += a->len; nitems++; } if (qblocks != q->blocks || nitems != q->nitems) { cmn_err(CE_PANIC, "rdc enqueue: q %p, q blocks %" NSC_SZFMT " (%" NSC_SZFMT "), nitems %" NSC_SZFMT " (%" NSC_SZFMT ")", (void *) q, q->blocks, qblocks, q->nitems, nitems); } } #endif mutex_exit(&q->net_qlock); if (q->nitems > q->nitems_hwm) { q->nitems_hwm = q->nitems; } if (q->blocks > q->blocks_hwm) { q->blocks_hwm = q->blocks; } if (!krdc->group->rdc_writer) (void) rdc_writer(krdc->index); return (0); } int _rdc_enqueue_write(rdc_k_info_t *krdc, nsc_off_t pos, nsc_size_t len, int flag, nsc_buf_t *h) { rdc_aio_t *aio; rdc_group_t *group; rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; int rc; aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP); if (!aio) { return (ENOMEM); } group = krdc->group; aio->pos = pos; aio->qpos = -1; aio->len = len; aio->flag = flag; aio->index = krdc->index; aio->handle = h; if (group->flags & RDC_MEMQUE) { return (rdc_memq_enqueue(krdc, aio)); } else if ((group->flags & RDC_DISKQUE) && !IS_STATE(urdc, RDC_DISKQ_FAILED)) { rc = rdc_diskq_enqueue(krdc, aio); kmem_free(aio, sizeof (*aio)); return (rc); } return (-1); /* keep lint quiet */ } /* * Async Network RDC flusher */ /* * don't allow any new writer threads to start if a member of the set * is disable pending */ int is_disable_pending(rdc_k_info_t *krdc) { rdc_k_info_t *this = krdc; int rc = 0; do { if (krdc->type_flag & RDC_DISABLEPEND) { krdc = this; rc = 1; break; } krdc = krdc->group_next; } while (krdc != this); return (rc); } /* * rdc_writer -- spawn new writer if not running already * called after enqueing the dirty blocks */ int rdc_writer(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; nsthread_t *t; rdc_group_t *group; kmutex_t *qlock; int tries; const int MAX_TRIES = 16; group = krdc->group; if (RDC_IS_DISKQ(group)) qlock = &group->diskq.disk_qlock; else qlock = &group->ra_queue.net_qlock; mutex_enter(qlock); #ifdef DEBUG if (noflush) { mutex_exit(qlock); return (0); } #endif if ((group->rdc_writer) || is_disable_pending(krdc)) { mutex_exit(qlock); return (0); } if ((group->rdc_thrnum >= 1) && (group->seqack == RDC_NEWSEQ)) { /* * We also need to check if we are starting a new * sequence, and if so don't create a new thread, * as we must ensure that the start of new sequence * requests arrives first to re-init the server. */ mutex_exit(qlock); return (0); } /* * For version 6, * see if we can fit in another thread. */ group->rdc_thrnum++; if (krdc->intf && (krdc->intf->rpc_version >= RDC_VERSION6)) { rdc_u_info_t *urdc = &rdc_u_info[index]; if (group->rdc_thrnum >= urdc->asyncthr) group->rdc_writer = 1; } else { group->rdc_writer = 1; } mutex_exit(qlock); /* * If we got here, we know that we have not exceeded the allowed * number of async threads for our group. If we run out of threads * in _rdc_flset, we add a new thread to the set. */ tries = 0; do { /* first try to grab a thread from the free list */ if (t = nst_create(_rdc_flset, rdc_flusher_thread, (blind_t)(unsigned long)index, 0)) { break; } /* that failed; add a thread to the set and try again */ if (nst_add_thread(_rdc_flset, 1) != 1) { cmn_err(CE_WARN, "!rdc_writer index %d nst_add_thread " "error, tries: %d", index, tries); break; } } while (++tries < MAX_TRIES); if (tries) { mutex_enter(&group->addthrnumlk); group->rdc_addthrnum += tries; mutex_exit(&group->addthrnumlk); } if (t) { return (1); } cmn_err(CE_WARN, "!rdc_writer: index %d nst_create error", index); rdc_many_enter(krdc); mutex_enter(qlock); group->rdc_thrnum--; group->rdc_writer = 0; if ((group->count == 0) && (group->rdc_thrnum == 0)) { mutex_exit(qlock); /* * Race with remove_from_group while write thread was * failing to be created. */ #ifdef DEBUG cmn_err(CE_WARN, "!rdc_writer: group being destroyed"); #endif rdc_delgroup(group); krdc->group = NULL; rdc_many_exit(krdc); return (-1); } mutex_exit(qlock); rdc_many_exit(krdc); return (-1); } /* * Either we need to flush the * kmem (net_queue) queue or the disk (disk_queue) * determine which, and do it. */ void rdc_flusher_thread(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; if (krdc->group->flags & RDC_MEMQUE) { rdc_flush_memq(index); return; } else if (krdc->group->flags & RDC_DISKQUE) { rdc_flush_diskq(index); return; } else { /* uh-oh, big time */ cmn_err(CE_PANIC, "flusher trying to flush unknown queue type"); } } void rdc_flush_memq(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; rdc_aio_t *aio; net_queue *q; int dowork; rdc_group_t *group = krdc->group; if (!group || group->count == 0) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_memq: no group left!"); #endif return; } if (!krdc->c_fd) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_memq: no c_fd!"); #endif goto thread_death; } #ifdef DEBUG_DISABLE if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) { cmn_err(CE_WARN, "!rdc_flush_memq: DISABLE PENDING!"); /* * Need to continue as we may be trying to flush IO * while trying to disable or suspend */ } #endif q = &group->ra_queue; dowork = 1; /* CONSTCOND */ while (dowork) { if (net_exit == ATM_EXIT) break; group = krdc->group; if (!group || group->count == 0) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_memq: no group left!"); #endif break; } mutex_enter(&q->net_qlock); aio = q->net_qhead; if (aio == NULL) { #ifdef DEBUG if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) { cmn_err(CE_PANIC, "rdc_flush_memq(1): q %p, q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT ", qhead %p qtail %p", (void *) q, q->blocks, q->nitems, (void *) aio, (void *) q->net_qtail); } #endif mutex_exit(&q->net_qlock); break; } /* aio remove from q */ q->net_qhead = aio->next; aio->next = NULL; if (q->net_qtail == aio) q->net_qtail = q->net_qhead; q->blocks -= aio->len; q->nitems--; /* * in flight numbers. */ q->inflbls += aio->len; q->inflitems++; #ifdef DEBUG if (q->net_qhead == NULL) { if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) { cmn_err(CE_PANIC, "rdc_flush_memq(2): q %p, q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT ", qhead %p qtail %p", (void *) q, q->blocks, q->nitems, (void *) q->net_qhead, (void *) q->net_qtail); } } #ifndef NSC_MULTI_TERABYTE if (q->blocks < 0) { cmn_err(CE_PANIC, "rdc_flush_memq(3): q %p, q blocks %" NSC_SZFMT ", nitems %d, qhead %p, qtail %p", (void *) q, q->blocks, q->nitems, (void *) q->net_qhead, (void *) q->net_qtail); } #else /* blocks and nitems are unsigned for NSC_MULTI_TERABYTE */ #endif #endif mutex_exit(&q->net_qlock); aio->iostatus = RDC_IO_INIT; _rdc_remote_flush(aio); mutex_enter(&q->net_qlock); q->inflbls -= aio->len; q->inflitems--; if ((group->seqack == RDC_NEWSEQ) && (group->seq != RDC_NEWSEQ + 1)) { if ((q->net_qhead == NULL) || (q->net_qhead->seq != RDC_NEWSEQ + 1)) { /* * We are an old thread, and the * queue sequence has been reset * during the network write above. * As such we mustn't pull another * job from the queue until the * first sequence message has been ack'ed. * Just die instead. Unless this thread * is the first sequence that has just * been ack'ed */ dowork = 0; } } mutex_exit(&q->net_qlock); if ((aio->iostatus != RDC_IO_DONE) && (group->count)) { rdc_k_info_t *krdctmp = &rdc_k_info[aio->index]; if (krdctmp->type_flag & RDC_DISABLEPEND) { kmem_free(aio, sizeof (*aio)); goto thread_death; } rdc_group_enter(krdc); ASSERT(krdc->group); rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE, "memq flush aio status not RDC_IO_DONE"); rdc_group_exit(krdc); rdc_dump_queue(aio->index); } kmem_free(aio, sizeof (*aio)); if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf) break; } thread_death: rdc_many_enter(krdc); mutex_enter(&group->ra_queue.net_qlock); group->rdc_thrnum--; group->rdc_writer = 0; /* * all threads must be dead. */ if ((group->count == 0) && (group->rdc_thrnum == 0)) { mutex_exit(&group->ra_queue.net_qlock); /* * Group now empty, so destroy * Race with remove_from_group while write thread was running */ #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_memq: group being destroyed"); #endif rdc_delgroup(group); krdc->group = NULL; rdc_many_exit(krdc); return; } mutex_exit(&group->ra_queue.net_qlock); rdc_many_exit(krdc); } /* * rdc_flush_diskq * disk queue flusher */ void rdc_flush_diskq(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; rdc_u_info_t *urdc = &rdc_u_info[index]; rdc_aio_t *aio = NULL; disk_queue *q; net_queue *nq; int dowork; int rc; rdc_group_t *group = krdc->group; if (!group || group->count == 0) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!"); #endif return; } if (!krdc->c_fd) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_diskq: no c_fd!"); #endif return; } #ifdef DEBUG_DISABLE if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) { cmn_err(CE_WARN, "!rdc_flush_diskq: DISABLE PENDING!"); /* * Need to continue as we may be trying to flush IO * while trying to disable or suspend */ } #endif q = &group->diskq; nq = &group->ra_queue; if (IS_QSTATE(q, RDC_QDISABLEPEND) || IS_STATE(urdc, RDC_LOGGING)) { #ifdef DEBUG cmn_err(CE_NOTE, "!flusher thread death 1 %x", QSTATE(q)); #endif goto thread_death; } dowork = 1; /* CONSTCOND */ while (dowork) { if (net_exit == ATM_EXIT) break; group = krdc->group; if (!group || group->count == 0) { #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!"); #endif break; } do { rc = 0; if ((IS_STATE(urdc, RDC_LOGGING)) || (IS_STATE(urdc, RDC_SYNCING)) || (nq->qfflags & RDC_QFILLSLEEP)) goto thread_death; aio = rdc_dequeue(krdc, &rc); if ((IS_STATE(urdc, RDC_LOGGING)) || (IS_STATE(urdc, RDC_SYNCING)) || (nq->qfflags & RDC_QFILLSLEEP)) { goto thread_death; } if (rc == EAGAIN) { delay(40); } } while (rc == EAGAIN); if (aio == NULL) { break; } aio->iostatus = RDC_IO_INIT; mutex_enter(QLOCK(q)); q->inflbls += aio->len; q->inflitems++; mutex_exit(QLOCK(q)); _rdc_remote_flush(aio); mutex_enter(QLOCK(q)); q->inflbls -= aio->len; q->inflitems--; if ((group->seqack == RDC_NEWSEQ) && (group->seq != RDC_NEWSEQ + 1)) { if ((nq->net_qhead == NULL) || (nq->net_qhead->seq != RDC_NEWSEQ + 1)) { /* * We are an old thread, and the * queue sequence has been reset * during the network write above. * As such we mustn't pull another * job from the queue until the * first sequence message has been ack'ed. * Just die instead. Unless of course, * this thread is the first sequence that * has just been ack'ed. */ dowork = 0; } } mutex_exit(QLOCK(q)); if (aio->iostatus == RDC_IO_CANCELLED) { rdc_dump_queue(aio->index); kmem_free(aio, sizeof (*aio)); aio = NULL; if (group) { /* seq gets bumped on dequeue */ mutex_enter(QLOCK(q)); rdc_dump_iohdrs(q); SET_QNXTIO(q, QHEAD(q)); SET_QCOALBOUNDS(q, QHEAD(q)); group->seq = RDC_NEWSEQ; group->seqack = RDC_NEWSEQ; mutex_exit(QLOCK(q)); } break; } if ((aio->iostatus != RDC_IO_DONE) && (group->count)) { rdc_k_info_t *krdctmp = &rdc_k_info[aio->index]; if (krdctmp->type_flag & RDC_DISABLEPEND) { kmem_free(aio, sizeof (*aio)); aio = NULL; goto thread_death; } rdc_group_enter(krdc); rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE | RDC_QUEUING, "diskq flush aio status not RDC_IO_DONE"); rdc_group_exit(krdc); rdc_dump_queue(aio->index); } kmem_free(aio, sizeof (*aio)); aio = NULL; #ifdef DEBUG_DISABLE if (krdc->type_flag & RDC_DISABLEPEND) { cmn_err(CE_WARN, "!rdc_flush_diskq: DISABLE PENDING after IO!"); } #endif if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf) break; if (IS_QSTATE(q, RDC_QDISABLEPEND)) { #ifdef DEBUG cmn_err(CE_NOTE, "!flusher thread death 2"); #endif break; } } thread_death: rdc_many_enter(krdc); mutex_enter(QLOCK(q)); group->rdc_thrnum--; group->rdc_writer = 0; if (aio && aio->qhandle) { aio->qhandle->sb_user--; if (aio->qhandle->sb_user == 0) { (void) _rdc_rsrv_diskq(krdc->group); rdc_fixlen(aio); (void) nsc_free_buf(aio->qhandle); aio->qhandle = NULL; aio->handle = NULL; _rdc_rlse_diskq(krdc->group); } } if ((group->count == 0) && (group->rdc_thrnum == 0)) { mutex_exit(QLOCK(q)); /* * Group now empty, so destroy * Race with remove_from_group while write thread was running */ #ifdef DEBUG cmn_err(CE_WARN, "!rdc_flush_diskq: group being destroyed"); #endif mutex_enter(&group->diskqmutex); rdc_close_diskq(group); mutex_exit(&group->diskqmutex); rdc_delgroup(group); krdc->group = NULL; rdc_many_exit(krdc); return; } mutex_exit(QLOCK(q)); rdc_many_exit(krdc); } /* * _rdc_remote_flush * Flush a single block ANON block * this function will flush from either the disk queue * or the memory queue. The appropriate locks must be * taken out etc, etc ... */ static void _rdc_remote_flush(rdc_aio_t *aio) { rdc_k_info_t *krdc = &rdc_k_info[aio->index]; rdc_u_info_t *urdc = &rdc_u_info[aio->index]; disk_queue *q = &krdc->group->diskq; kmutex_t *qlock; rdc_group_t *group; nsc_buf_t *h = NULL; int reserved = 0; int rtype = RDC_RAW; int rc; uint_t maxseq; struct netwriteres netret; int waitq = 1; int vflags; group = krdc->group; netret.vecdata.vecdata_val = NULL; netret.vecdata.vecdata_len = 0; /* Where did we get this aio from anyway? */ if (RDC_IS_DISKQ(group)) { qlock = &group->diskq.disk_qlock; } else { qlock = &group->ra_queue.net_qlock; } /* * quench transmission if we are too far ahead of the * server Q, or it will overflow. * Must fail all requests while asyncdis is set. * It will be cleared when the last thread to be discarded * sets the asyncstall counter to zero. * Note the thread within rdc_net_write * also bumps the asyncstall counter. */ mutex_enter(qlock); if (group->asyncdis) { aio->iostatus = RDC_IO_CANCELLED; mutex_exit(qlock); goto failed; } /* don't go to sleep if we have gone logging! */ vflags = rdc_get_vflags(urdc); if ((vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) { if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group)) aio->iostatus = RDC_IO_CANCELLED; mutex_exit(qlock); goto failed; } while (maxseq = group->seqack + RDC_MAXPENDQ + 1, maxseq = (maxseq < group->seqack) ? maxseq + RDC_NEWSEQ + 1 : maxseq, !RDC_INFRONT(aio->seq, maxseq)) { group->asyncstall++; ASSERT(!IS_STATE(urdc, RDC_LOGGING)); cv_wait(&group->asyncqcv, qlock); group->asyncstall--; ASSERT(group->asyncstall >= 0); if (group->asyncdis) { if (group->asyncstall == 0) { group->asyncdis = 0; } aio->iostatus = RDC_IO_CANCELLED; mutex_exit(qlock); goto failed; } /* * See if we have gone into logging mode * since sleeping. */ vflags = rdc_get_vflags(urdc); if (vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING)) { if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group)) aio->iostatus = RDC_IO_CANCELLED; mutex_exit(qlock); goto failed; } } mutex_exit(qlock); if ((krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) { mutex_enter(krdc->io_kstats->ks_lock); kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); waitq = 0; } rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL); if (rc != 0) { #ifdef DEBUG cmn_err(CE_WARN, "!_rdc_remote_flush: reserve, index %d, rc %d", aio->index, rc); #endif goto failed; } reserved = 1; /* * Case where we are multihop and calling with no ANON bufs * Need to do the read to fill the buf. */ if (!aio->handle) { rc = nsc_alloc_buf(RDC_U_FD(krdc), aio->pos, aio->len, (aio->flag & ~NSC_WRITE) | NSC_READ, &h); if (!RDC_SUCCESS(rc)) { #ifdef DEBUG cmn_err(CE_WARN, "!_rdc_remote_flush: alloc_buf, index %d, pos %" NSC_SZFMT ", len %" NSC_SZFMT ", rc %d", aio->index, aio->pos, aio->len, rc); #endif goto failed; } aio->handle = h; aio->handle->sb_user = RDC_NULLBUFREAD; } mutex_enter(qlock); if (group->asyncdis) { if (group->asyncstall == 0) { group->asyncdis = 0; } aio->iostatus = RDC_IO_CANCELLED; mutex_exit(qlock); goto failed; } group->asyncstall++; mutex_exit(qlock); if (krdc->remote_index < 0) { /* * this should be ok, we are flushing, not rev syncing. * remote_index could be -1 if we lost a race with * resume and the flusher trys to flush an io from * another set that has not resumed */ krdc->remote_index = rdc_net_state(krdc->index, CCIO_SLAVE); DTRACE_PROBE1(remote_index_negative, int, krdc->remote_index); } /* * double check for logging, no check in net_write() * skip the write if you can, otherwise, if logging * avoid clearing the bit .. you don't know whose bit it may * also be. */ if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) { aio->iostatus = RDC_IO_CANCELLED; mutex_enter(qlock); group->asyncstall--; mutex_exit(qlock); goto failed; } rc = rdc_net_write(krdc->index, krdc->remote_index, aio->handle, aio->pos, aio->len, aio->seq, aio->qpos, &netret); mutex_enter(qlock); group->asyncstall--; if (group->asyncdis) { if (group->asyncstall == 0) { group->asyncdis = 0; } aio->iostatus = RDC_IO_CANCELLED; mutex_exit(qlock); goto failed; } if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) { mutex_exit(qlock); aio->iostatus = RDC_IO_CANCELLED; goto failed; } ASSERT(aio->handle); if (rc != 0) { #ifdef DEBUG cmn_err(CE_WARN, "!_rdc_remote_flush: write, index %d, pos %" NSC_SZFMT ", len %" NSC_SZFMT ", " "rc %d seq %u group seq %u seqack %u qpos %" NSC_SZFMT, aio->index, aio->pos, aio->len, rc, aio->seq, group->seq, group->seqack, aio->qpos); #endif if (rc == ENOLINK) { cmn_err(CE_WARN, "!Hard timeout detected (%d sec) " "on SNDR set %s:%s", rdc_rpc_tmout, urdc->secondary.intf, urdc->secondary.file); } mutex_exit(qlock); goto failed; } else { aio->iostatus = RDC_IO_DONE; } if (RDC_IS_DISKQ(group)) { /* free locally alloc'd handle */ if (aio->handle->sb_user == RDC_NULLBUFREAD) { (void) nsc_free_buf(aio->handle); aio->handle = NULL; } aio->qhandle->sb_user--; if (aio->qhandle->sb_user == 0) { (void) _rdc_rsrv_diskq(group); rdc_fixlen(aio); (void) nsc_free_buf(aio->qhandle); aio->qhandle = NULL; aio->handle = NULL; _rdc_rlse_diskq(group); } } else { (void) nsc_free_buf(aio->handle); aio->handle = NULL; } mutex_exit(qlock); _rdc_rlse_devs(krdc, rtype); if (netret.result == 0) { vflags = rdc_get_vflags(urdc); if (!(vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) { RDC_CLR_BITMAP(krdc, aio->pos, aio->len, \ 0xffffffff, RDC_BIT_BUMP); if (RDC_IS_DISKQ(krdc->group)) { if (!IS_STATE(urdc, RDC_LOGGING)) { /* tell queue data has been flushed */ rdc_clr_iohdr(krdc, aio->qpos); } else { /* throw away queue, logging */ mutex_enter(qlock); rdc_dump_iohdrs(q); SET_QNXTIO(q, QHEAD(q)); SET_QCOALBOUNDS(q, QHEAD(q)); mutex_exit(qlock); } } } mutex_enter(qlock); /* * Check to see if the reply has arrived out of * order, if so don't update seqack. */ if (!RDC_INFRONT(aio->seq, group->seqack)) { group->seqack = aio->seq; } #ifdef DEBUG else { rdc_ooreply++; } #endif if (group->asyncstall) { cv_broadcast(&group->asyncqcv); } mutex_exit(qlock); } else if (netret.result < 0) { aio->iostatus = RDC_IO_FAILED; } /* * see if we have any pending async requests we can mark * as done. */ if (netret.vecdata.vecdata_len) { net_pendvec_t *vecp; net_pendvec_t *vecpe; vecp = netret.vecdata.vecdata_val; vecpe = netret.vecdata.vecdata_val + netret.vecdata.vecdata_len; while (vecp < vecpe) { rdc_k_info_t *krdcp = &rdc_k_info[vecp->pindex]; rdc_u_info_t *urdcp = &rdc_u_info[vecp->pindex]; /* * we must always still be in the same group. */ ASSERT(krdcp->group == group); vflags = rdc_get_vflags(urdcp); if (!(vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) { RDC_CLR_BITMAP(krdcp, vecp->apos, vecp->alen, \ 0xffffffff, RDC_BIT_BUMP); if (RDC_IS_DISKQ(krdcp->group)) { if (!IS_STATE(urdc, RDC_LOGGING)) { /* update queue info */ rdc_clr_iohdr(krdc, vecp->qpos); } else { /* we've gone logging */ mutex_enter(qlock); rdc_dump_iohdrs(q); SET_QNXTIO(q, QHEAD(q)); SET_QCOALBOUNDS(q, QHEAD(q)); mutex_exit(qlock); } } } /* * see if we can re-start transmission */ mutex_enter(qlock); if (!RDC_INFRONT(vecp->seq, group->seqack)) { group->seqack = vecp->seq; } #ifdef DEBUG else { rdc_ooreply++; } #endif DTRACE_PROBE1(pendvec_return, int, vecp->seq); if (group->asyncstall) { cv_broadcast(&group->asyncqcv); } mutex_exit(qlock); vecp++; } } if (netret.vecdata.vecdata_val) kmem_free(netret.vecdata.vecdata_val, netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); return; failed: /* perhaps we have a few threads stuck .. */ if (group->asyncstall) { group->asyncdis = 1; cv_broadcast(&group->asyncqcv); } if (netret.vecdata.vecdata_val) kmem_free(netret.vecdata.vecdata_val, netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); mutex_enter(qlock); if (RDC_IS_DISKQ(group)) { /* free locally alloc'd hanlde */ if ((aio->handle) && (aio->handle->sb_user == RDC_NULLBUFREAD)) { (void) nsc_free_buf(aio->handle); aio->handle = NULL; } aio->qhandle->sb_user--; if (aio->qhandle->sb_user == 0) { (void) _rdc_rsrv_diskq(group); rdc_fixlen(aio); (void) nsc_free_buf(aio->qhandle); aio->qhandle = NULL; aio->handle = NULL; _rdc_rlse_diskq(group); } } else { if (aio->handle) { (void) nsc_free_buf(aio->handle); aio->handle = NULL; } } mutex_exit(qlock); if (reserved) { _rdc_rlse_devs(krdc, rtype); } if ((waitq && krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) { mutex_enter(krdc->io_kstats->ks_lock); kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats)); mutex_exit(krdc->io_kstats->ks_lock); } /* make sure that the bit is still set */ RDC_CHECK_BIT(krdc, aio->pos, aio->len); if (aio->iostatus != RDC_IO_CANCELLED) aio->iostatus = RDC_IO_FAILED; } /* * rdc_drain_disk_queue * drain the async network queue for the whole group. Bail out if nothing * happens in 20 sec * returns -1 if it bails before the queues are drained. */ #define NUM_RETRIES 15 /* Number of retries to wait if no progress */ int rdc_drain_disk_queue(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; volatile rdc_group_t *group; volatile disk_queue *diskq; int threads, counter; long blocks; /* Sanity checking */ if (index > rdc_max_sets) return (0); /* * If there is no group or diskq configured, we can leave now */ if (!(group = krdc->group) || !(diskq = &group->diskq)) return (0); /* * No need to wait if EMPTY and threads are gone */ counter = 0; while (!QEMPTY(diskq) || group->rdc_thrnum) { /* * Capture counters to determine if progress is being made */ blocks = QBLOCKS(diskq); threads = group->rdc_thrnum; /* * Wait */ delay(HZ); /* * Has the group or disk queue gone away while delayed? */ if (!(group = krdc->group) || !(diskq = &group->diskq)) return (0); /* * Are we still seeing progress? */ if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) { /* * No progress seen, increment retry counter */ if (counter++ > NUM_RETRIES) { return (-1); } } else { /* * Reset counter, as we've made progress */ counter = 0; } } return (0); } /* * decide what needs to be drained, disk or core * and drain it */ int rdc_drain_queue(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; rdc_group_t *group = krdc->group; if (!group) return (0); if (RDC_IS_DISKQ(group)) return (rdc_drain_disk_queue(index)); if (RDC_IS_MEMQ(group)) return (rdc_drain_net_queue(index)); /* oops.. */ #ifdef DEBUG cmn_err(CE_WARN, "!rdc_drain_queue: " "attempting drain of unknown Q type"); #endif return (0); } /* * rdc_drain_net_queue * drain the async network queue for the whole group. Bail out if nothing * happens in 20 sec * returns -1 if it bails before the queues are drained. */ int rdc_drain_net_queue(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; volatile net_queue *q; int bail = 20; /* bail out in about 20 secs */ nsc_size_t blocks; /* Sanity checking */ if (index > rdc_max_sets) return (0); if (!krdc->group) return (0); /* LINTED */ if (!(q = &krdc->group->ra_queue)) return (0); /* CONSTCOND */ while (1) { if (((volatile rdc_aio_t *)q->net_qhead == NULL) && (krdc->group->rdc_thrnum == 0)) { break; } blocks = q->blocks; q = (volatile net_queue *)&krdc->group->ra_queue; if ((blocks == q->blocks) && (--bail <= 0)) { break; } delay(HZ); } if (bail <= 0) return (-1); return (0); } /* * rdc_dump_queue * We want to release all the blocks currently on the network flushing queue * We already have them logged in the bitmap. */ void rdc_dump_queue(int index) { rdc_k_info_t *krdc = &rdc_k_info[index]; rdc_aio_t *aio; net_queue *q; rdc_group_t *group; disk_queue *dq; kmutex_t *qlock; group = krdc->group; q = &group->ra_queue; dq = &group->diskq; /* * gotta have both locks here for diskq */ if (RDC_IS_DISKQ(group)) { mutex_enter(&q->net_qlock); if (q->qfill_sleeping == RDC_QFILL_AWAKE) { int tries = 3; #ifdef DEBUG_DISKQ cmn_err(CE_NOTE, "!dumpq sending diskq->memq flusher to sleep"); #endif q->qfflags |= RDC_QFILLSLEEP; mutex_exit(&q->net_qlock); while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--) delay(5); mutex_enter(&q->net_qlock); } } if (RDC_IS_DISKQ(group)) { qlock = &dq->disk_qlock; (void) _rdc_rsrv_diskq(group); } else { qlock = &q->net_qlock; } mutex_enter(qlock); group->seq = RDC_NEWSEQ; /* reset the sequence number */ group->seqack = RDC_NEWSEQ; /* if the q is on disk, dump the q->iohdr chain */ if (RDC_IS_DISKQ(group)) { rdc_dump_iohdrs(dq); /* back up the nxtio pointer */ SET_QNXTIO(dq, QHEAD(dq)); SET_QCOALBOUNDS(dq, QHEAD(dq)); } while (q->net_qhead) { rdc_k_info_t *tmpkrdc; aio = q->net_qhead; tmpkrdc = &rdc_k_info[aio->index]; if (RDC_IS_DISKQ(group)) { aio->qhandle->sb_user--; if (aio->qhandle->sb_user == 0) { rdc_fixlen(aio); (void) nsc_free_buf(aio->qhandle); aio->qhandle = NULL; aio->handle = NULL; } } else { if (aio->handle) { (void) nsc_free_buf(aio->handle); aio->handle = NULL; } } q->net_qhead = aio->next; RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len); kmem_free(aio, sizeof (*aio)); if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(group)) { mutex_enter(tmpkrdc->io_kstats->ks_lock); kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats)); mutex_exit(tmpkrdc->io_kstats->ks_lock); } } q->net_qtail = NULL; q->blocks = 0; q->nitems = 0; /* * See if we have stalled threads. */ done: if (group->asyncstall) { group->asyncdis = 1; cv_broadcast(&group->asyncqcv); } mutex_exit(qlock); if (RDC_IS_DISKQ(group)) { mutex_exit(&q->net_qlock); _rdc_rlse_diskq(group); } } /* * rdc_clnt_get * Get a CLIENT handle and cache it */ static int rdc_clnt_get(rdc_srv_t *svp, rpcvers_t vers, struct chtab **rch, CLIENT **clp) { uint_t max_msgsize; int retries; int ret; struct cred *cred; int num_clnts = 0; register struct chtab *ch; struct chtab **plistp; CLIENT *client = 0; if (rch) { *rch = 0; } if (clp) { *clp = 0; } retries = 6; /* Never used for COTS in Solaris */ cred = ddi_get_cred(); max_msgsize = RDC_RPC_MAX; mutex_enter(&rdc_clnt_lock); ch = rdc_chtable; plistp = &rdc_chtable; /* find the right ch_list chain */ for (ch = rdc_chtable; ch != NULL; ch = ch->ch_next) { if (ch->ch_prog == RDC_PROGRAM && ch->ch_vers == vers && ch->ch_dev == svp->ri_knconf->knc_rdev && ch->ch_protofmly != NULL && strcmp(ch->ch_protofmly, svp->ri_knconf->knc_protofmly) == 0) { /* found the correct chain to walk */ break; } plistp = &ch->ch_next; } if (ch != NULL) { /* walk the ch_list and try and find a free client */ for (num_clnts = 0; ch != NULL; ch = ch->ch_list, num_clnts++) { if (ch->ch_inuse == FALSE) { /* suitable handle to reuse */ break; } plistp = &ch->ch_list; } } if (ch == NULL && num_clnts >= MAXCLIENTS) { /* alloc a temporary handle and return */ rdc_clnt_toomany++; mutex_exit(&rdc_clnt_lock); ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr), RDC_PROGRAM, vers, max_msgsize, retries, cred, &client); if (ret != 0) { cmn_err(CE_NOTE, "!rdc_call: tli_kcreate failed %d", ret); return (ret); } *rch = 0; *clp = client; (void) CLNT_CONTROL(client, CLSET_PROGRESS, NULL); return (ret); } if (ch != NULL) { /* reuse a cached handle */ ch->ch_inuse = TRUE; ch->ch_timesused++; mutex_exit(&rdc_clnt_lock); *rch = ch; if (ch->ch_client == NULL) { ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr), RDC_PROGRAM, vers, max_msgsize, retries, cred, &ch->ch_client); if (ret != 0) { ch->ch_inuse = FALSE; return (ret); } (void) CLNT_CONTROL(ch->ch_client, CLSET_PROGRESS, NULL); *clp = ch->ch_client; return (0); } else { /* * Consecutive calls to CLNT_CALL() on the same client handle * get the same transaction ID. We want a new xid per call, * so we first reinitialise the handle. */ (void) clnt_tli_kinit(ch->ch_client, svp->ri_knconf, &(svp->ri_addr), max_msgsize, retries, cred); *clp = ch->ch_client; return (0); } } /* create new handle and cache it */ ch = (struct chtab *)kmem_zalloc(sizeof (*ch), KM_SLEEP); if (ch) { ch->ch_inuse = TRUE; ch->ch_prog = RDC_PROGRAM; ch->ch_vers = vers; ch->ch_dev = svp->ri_knconf->knc_rdev; ch->ch_protofmly = (char *)kmem_zalloc( strlen(svp->ri_knconf->knc_protofmly)+1, KM_SLEEP); if (ch->ch_protofmly) (void) strcpy(ch->ch_protofmly, svp->ri_knconf->knc_protofmly); *plistp = ch; } mutex_exit(&rdc_clnt_lock); ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr), RDC_PROGRAM, vers, max_msgsize, retries, cred, clp); if (ret != 0) { if (ch) ch->ch_inuse = FALSE; cmn_err(CE_NOTE, "!rdc_call: tli_kcreate failed %d", ret); return (ret); } *rch = ch; if (ch) ch->ch_client = *clp; (void) CLNT_CONTROL(*clp, CLSET_PROGRESS, NULL); return (ret); } long rdc_clnt_count = 0; /* * rdc_clnt_call * Arguments: * rdc_srv_t *svp - rdc servinfo * rpcproc_t proc; - rpcid * rpcvers_t vers; - protocol version * xdrproc_t xargs;- xdr function * caddr_t argsp;- args to xdr function * xdrproc_t xres;- xdr function * caddr_t resp;- args to xdr function * struct timeval timeout; * Performs RPC client call using specific protocol and version */ int rdc_clnt_call(rdc_srv_t *svp, rpcproc_t proc, rpcvers_t vers, xdrproc_t xargs, caddr_t argsp, xdrproc_t xres, caddr_t resp, struct timeval *timeout) { CLIENT *rh = NULL; int err; int tries = 0; struct chtab *ch = NULL; err = rdc_clnt_get(svp, vers, &ch, &rh); if (err || !rh) return (err); do { DTRACE_PROBE3(rdc_clnt_call_1, CLIENT *, rh, rpcproc_t, proc, xdrproc_t, xargs); err = cl_call_sig(rh, proc, xargs, argsp, xres, resp, *timeout); DTRACE_PROBE1(rdc_clnt_call_end, int, err); switch (err) { case RPC_SUCCESS: /* bail now */ goto done; case RPC_INTR: /* No recovery from this */ goto done; case RPC_PROGVERSMISMATCH: goto done; case RPC_TLIERROR: /* fall thru */ case RPC_XPRTFAILED: /* Delay here to err on side of caution */ /* fall thru */ case RPC_VERSMISMATCH: default: if (IS_UNRECOVERABLE_RPC(err)) { goto done; } tries++; /* * The call is in progress (over COTS) * Try the CLNT_CALL again, but don't * print a noisy error message */ if (err == RPC_INPROGRESS) break; cmn_err(CE_NOTE, "!SNDR client: err %d %s", err, clnt_sperrno(err)); } } while (tries && (tries < 2)); done: ++rdc_clnt_count; rdc_clnt_free(ch, rh); return (err); } /* * Call an rpc from the client side, not caring which protocol is used. */ int rdc_clnt_call_any(rdc_srv_t *svp, rdc_if_t *ip, rpcproc_t proc, xdrproc_t xargs, caddr_t argsp, xdrproc_t xres, caddr_t resp, struct timeval *timeout) { rpcvers_t vers; int rc; if (ip != NULL) { vers = ip->rpc_version; } else { vers = RDC_VERS_MAX; } do { rc = rdc_clnt_call(svp, proc, vers, xargs, argsp, xres, resp, timeout); if (rc == RPC_PROGVERSMISMATCH) { /* * Downgrade and try again. */ vers--; } } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH)); if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) { mutex_enter(&rdc_ping_lock); ip->rpc_version = vers; mutex_exit(&rdc_ping_lock); } return (rc); } /* * Call an rpc from the client side, starting with protocol specified */ int rdc_clnt_call_walk(rdc_k_info_t *krdc, rpcproc_t proc, xdrproc_t xargs, caddr_t argsp, xdrproc_t xres, caddr_t resp, struct timeval *timeout) { int rc; rpcvers_t vers; rdc_srv_t *svp = krdc->lsrv; rdc_if_t *ip = krdc->intf; vers = krdc->rpc_version; do { rc = rdc_clnt_call(svp, proc, vers, xargs, argsp, xres, resp, timeout); if (rc == RPC_PROGVERSMISMATCH) { /* * Downgrade and try again. */ vers--; } } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH)); if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) { mutex_enter(&rdc_ping_lock); ip->rpc_version = vers; mutex_exit(&rdc_ping_lock); } return (rc); } /* * rdc_clnt_free * Free a client structure into the cache, or if this was a temporary * handle allocated above MAXCLIENTS, destroy it. */ static void rdc_clnt_free(struct chtab *ch, CLIENT *clp) { if (ch != NULL) { /* cached client, just clear inuse flag and return */ ASSERT(ch->ch_client == clp); ch->ch_inuse = FALSE; return; } /* temporary handle allocated above MAXCLIENTS, so destroy it */ if (clp->cl_auth) { AUTH_DESTROY(clp->cl_auth); clp->cl_auth = 0; } CLNT_DESTROY(clp); } /* * _rdc_clnt_destroy * Free a chain (ch_list or ch_next) of cached clients */ static int _rdc_clnt_destroy(struct chtab **p, const int list) { struct chtab *ch; int leak = 0; if (!p) return (0); while (*p != NULL) { ch = *p; /* * unlink from the chain * - this leaks the client if it was inuse */ *p = list ? ch->ch_list : ch->ch_next; if (!ch->ch_inuse) { /* unused client - destroy it */ if (ch->ch_client) { if (ch->ch_client->cl_auth) { AUTH_DESTROY(ch->ch_client->cl_auth); ch->ch_client->cl_auth = 0; } CLNT_DESTROY(ch->ch_client); ch->ch_client = 0; } if (ch->ch_protofmly) kmem_free(ch->ch_protofmly, strlen(ch->ch_protofmly)+1); kmem_free(ch, sizeof (*ch)); } else { /* remember client leak */ leak++; } } return (leak); } /* * rdc_clnt_destroy * Free client caching table on unconfigure */ void rdc_clnt_destroy(void) { struct chtab *ch; int leak = 0; mutex_enter(&rdc_clnt_lock); /* destroy each ch_list chain */ for (ch = rdc_chtable; ch; ch = ch->ch_next) { leak += _rdc_clnt_destroy(&ch->ch_list, 1); } /* destroy the main ch_next chain */ leak += _rdc_clnt_destroy(&rdc_chtable, 0); if (leak) { /* we are about to leak clients */ cmn_err(CE_WARN, "!rdc_clnt_destroy: leaking %d inuse clients", leak); } mutex_exit(&rdc_clnt_lock); } #ifdef DEBUG /* * Function to send an asynchronous net_data6 request * direct to a server to allow the generation of * out of order requests for ZatoIchi tests. */ int rdc_async6(void *arg, int mode, int *rvp) { int index; rdc_async6_t async6; struct net_data6 data6; rdc_k_info_t *krdc; rdc_u_info_t *urdc; char *data; int datasz; char *datap; int rc; struct timeval t; struct netwriteres netret; int i; rc = 0; *rvp = 0; /* * copyin the user's arguments. */ if (ddi_copyin(arg, &async6, sizeof (async6), mode) < 0) { return (EFAULT); } /* * search by the secondary host and file. */ mutex_enter(&rdc_conf_lock); for (index = 0; index < rdc_max_sets; index++) { urdc = &rdc_u_info[index]; krdc = &rdc_k_info[index]; if (!IS_CONFIGURED(krdc)) continue; if (!IS_ENABLED(urdc)) continue; if (!IS_ASYNC(urdc)) continue; if (krdc->rpc_version < RDC_VERSION6) continue; if ((strncmp(urdc->secondary.intf, async6.sechost, MAX_RDC_HOST_SIZE) == 0) && (strncmp(urdc->secondary.file, async6.secfile, NSC_MAXPATH) == 0)) { break; } } mutex_exit(&rdc_conf_lock); if (index >= rdc_max_sets) { return (ENOENT); } if (async6.spos != -1) { if ((async6.spos < async6.pos) || ((async6.spos + async6.slen) > (async6.pos + async6.len))) { cmn_err(CE_WARN, "!Sub task not within range " "start %d length %d sub start %d sub length %d", async6.pos, async6.len, async6.spos, async6.slen); return (EIO); } } datasz = FBA_SIZE(1); data = kmem_alloc(datasz, KM_SLEEP); datap = data; while (datap < &data[datasz]) { /* LINTED */ *datap++ = async6.pat; } /* * Fill in the net databuffer prior to transmission. */ data6.local_cd = krdc->index; if (krdc->remote_index == -1) { cmn_err(CE_WARN, "!Remote index not known"); kmem_free(data, datasz); return (EIO); } else { data6.cd = krdc->remote_index; } data6.pos = async6.pos; data6.len = async6.len; data6.flag = 0; data6.idx = async6.idx; data6.seq = async6.seq; if (async6.spos == -1) { data6.sfba = async6.pos; data6.nfba = async6.len; data6.endoblk = 1; } else { data6.sfba = async6.spos; data6.nfba = async6.slen; data6.endoblk = async6.endind; } data6.data.data_len = datasz; data6.data.data_val = data; t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; netret.vecdata.vecdata_val = NULL; netret.vecdata.vecdata_len = 0; rc = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, krdc->rpc_version, xdr_net_data6, (char *)&data6, xdr_netwriteres, (char *)&netret, &t); kmem_free(data, datasz); if (rc == 0) { if (netret.result < 0) { rc = -netret.result; } cmn_err(CE_NOTE, "!async6: seq %u result %d index %d " "pendcnt %d", netret.seq, netret.result, netret.index, netret.vecdata.vecdata_len); for (i = 0; i < netret.vecdata.vecdata_len; i++) { net_pendvec_t pvec; bcopy(netret.vecdata.vecdata_val + i, &pvec, sizeof (net_pendvec_t)); cmn_err(CE_NOTE, "!Seq %u pos %llu len %llu", pvec.seq, (unsigned long long)pvec.apos, (unsigned long long)pvec.alen); } if (netret.vecdata.vecdata_val) kmem_free(netret.vecdata.vecdata_val, netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); } else { cmn_err(CE_NOTE, "!async6: rpc call failed %d", rc); } *rvp = netret.index; return (rc); } /* * Function to send an net_read6 request * direct to a server to allow the generation of * read requests. */ int rdc_readgen(void *arg, int mode, int *rvp) { int index; rdc_readgen_t readgen; rdc_readgen32_t readgen32; struct rread6 read6; struct rread read5; rdc_k_info_t *krdc; int ret; struct timeval t; struct rdcrdresult rr; int err; *rvp = 0; rr.rr_bufsize = 0; /* rpc data buffer length (bytes) */ rr.rr_data = NULL; /* rpc data buffer */ if (ddi_model_convert_from(mode & FMODELS) == DDI_MODEL_ILP32) { if (ddi_copyin(arg, &readgen32, sizeof (readgen32), mode)) { return (EFAULT); } (void) strncpy(readgen.sechost, readgen32.sechost, MAX_RDC_HOST_SIZE); (void) strncpy(readgen.secfile, readgen32.secfile, NSC_MAXPATH); readgen.len = readgen32.len; readgen.pos = readgen32.pos; readgen.idx = readgen32.idx; readgen.flag = readgen32.flag; readgen.data = (void *)(unsigned long)readgen32.data; readgen.rpcversion = readgen32.rpcversion; } else { if (ddi_copyin(arg, &readgen, sizeof (readgen), mode)) { return (EFAULT); } } switch (readgen.rpcversion) { case 5: case 6: break; default: return (EINVAL); } mutex_enter(&rdc_conf_lock); index = rdc_lookup_byhostdev(readgen.sechost, readgen.secfile); if (index >= 0) { krdc = &rdc_k_info[index]; } if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) { mutex_exit(&rdc_conf_lock); return (ENODEV); } /* * we should really call setbusy here. */ mutex_exit(&rdc_conf_lock); t.tv_sec = rdc_rpc_tmout; t.tv_usec = 0; if (krdc->remote_index == -1) { cmn_err(CE_WARN, "!Remote index not known"); ret = EIO; goto out; } if (readgen.rpcversion == 6) { read6.cd = krdc->remote_index; read6.len = readgen.len; read6.pos = readgen.pos; read6.idx = readgen.idx; read6.flag = readgen.flag; } else { read5.cd = krdc->remote_index; read5.len = readgen.len; read5.pos = readgen.pos; read5.idx = readgen.idx; read5.flag = readgen.flag; } if (readgen.flag & RDC_RREAD_START) { if (readgen.rpcversion == 6) { err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, RDC_VERSION6, xdr_rread6, (char *)&read6, xdr_int, (char *)&ret, &t); } else { err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, RDC_VERSION5, xdr_rread, (char *)&read5, xdr_int, (char *)&ret, &t); } if (err == 0) { *rvp = ret; ret = 0; } else { ret = EPROTO; } } else { if (readgen.rpcversion == 6) { err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, RDC_VERSION6, xdr_rread6, (char *)&read6, xdr_rdresult, (char *)&rr, &t); } else { err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, RDC_VERSION5, xdr_rread, (char *)&read5, xdr_rdresult, (char *)&rr, &t); } if (err == 0) { if (rr.rr_status != RDC_OK) { ret = EIO; goto out; } *rvp = rr.rr_bufsize; if (ddi_copyout(rr.rr_data, readgen.data, rr.rr_bufsize, mode) != 0) { ret = EFAULT; goto out; } ret = 0; } else { ret = EPROTO; goto out; } } out: if (rr.rr_data) { kmem_free(rr.rr_data, rr.rr_bufsize); } return (ret); } #endif