xref: /titanic_51/usr/src/uts/common/avs/ns/rdc/rdc_clnt.c (revision d5508a7fb37e6b070e142ee081bec69a3d20bd6c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /* Network data replicator Client side */
27 
28 
29 #include <sys/types.h>
30 #include <sys/debug.h>
31 #include <sys/ksynch.h>
32 #include <sys/cmn_err.h>
33 #include <sys/kmem.h>
34 #include <sys/cred.h>
35 #include <sys/byteorder.h>
36 #include <sys/errno.h>
37 
38 #ifdef _SunOS_2_6
39 /*
40  * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we
41  * define enum_t here as it is all we need from rpc/types.h
42  * anyway and make it look like we included it. Yuck.
43  */
44 #define	_RPC_TYPES_H
45 typedef int enum_t;
46 #else
47 #ifndef DS_DDICT
48 #include <rpc/types.h>
49 #endif
50 #endif /* _SunOS_2_6 */
51 
52 #ifndef DS_DDICT
53 #include <rpc/auth.h>
54 #include <rpc/svc.h>
55 #include <rpc/xdr.h>
56 #endif
57 #include <sys/ddi.h>
58 
59 #include <sys/nsc_thread.h>
60 #ifdef DS_DDICT
61 #include <sys/nsctl/contract.h>
62 #endif
63 #include <sys/nsctl/nsctl.h>
64 
65 #include <sys/sdt.h>		/* dtrace is S10 or later */
66 
67 #include "rdc_io.h"
68 #include "rdc_clnt.h"
69 #include "rdc_bitmap.h"
70 #include "rdc_diskq.h"
71 
72 
73 kmutex_t rdc_clnt_lock;
74 
75 #ifdef DEBUG
76 int noflush = 0;
77 #endif
78 
79 int rdc_rpc_tmout = RDC_CLNT_TMOUT;
80 static void rdc_clnt_free(struct chtab *, CLIENT *);
81 static void _rdc_remote_flush(rdc_aio_t *);
82 
83 void rdc_flush_memq(int index);
84 void rdc_flush_diskq(int index);
85 int rdc_drain_net_queue(int index);
86 void rdc_flusher_thread(int index);
87 int  rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *);
88 void rdc_init_diskq_header(rdc_group_t *grp, dqheader *hd);
89 void rdc_dump_iohdrs(disk_queue *dq);
90 rdc_aio_t *rdc_dequeue(rdc_k_info_t *krdc, int *rc);
91 void rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_off_t qpos);
92 void rdc_close_diskq(rdc_group_t *krdc);
93 
94 int rdc_writer(int index);
95 
96 static struct chtab *rdc_chtable = NULL;
97 static int rdc_clnt_toomany;
98 #ifdef DEBUG
99 static int rdc_ooreply;
100 #endif
101 
102 extern void rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag);
103 extern int _rdc_rsrv_diskq(rdc_group_t *group);
104 extern void _rdc_rlse_diskq(rdc_group_t *group);
105 
106 static enum clnt_stat
107 cl_call_sig(struct __client *rh, rpcproc_t proc,
108 	    xdrproc_t xargs, caddr_t argsp, xdrproc_t xres,
109 	    caddr_t resp, struct timeval secs)
110 {
111 	enum clnt_stat stat;
112 	k_sigset_t smask;
113 	sigintr(&smask, 0);
114 	rh->cl_nosignal = TRUE;
115 	stat = ((*(rh)->cl_ops->cl_call)\
116 	    (rh, proc, xargs, argsp, xres, resp, secs));
117 	rh->cl_nosignal = FALSE;
118 	sigunintr(&smask);
119 	return (stat);
120 }
121 
122 int
123 rdc_net_getsize(int index, uint64_t *sizeptr)
124 {
125 	struct timeval t;
126 	int err, size;
127 	rdc_k_info_t *krdc = &rdc_k_info[index];
128 	int remote_index = krdc->remote_index;
129 
130 	*sizeptr = 0;
131 	if (krdc->remote_index < 0)
132 		return (EINVAL);
133 
134 	t.tv_sec = rdc_rpc_tmout;
135 	t.tv_usec = 0;
136 
137 #ifdef DEBUG
138 	if (krdc->intf == NULL)
139 		cmn_err(CE_WARN,
140 		    "!rdc_net_getsize: null intf for index %d", index);
141 #endif
142 	if (krdc->rpc_version <= RDC_VERSION5) {
143 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE,
144 		    krdc->rpc_version, xdr_int, (char *)&remote_index,
145 		    xdr_int, (char *)&size, &t);
146 		if (err == 0)
147 			*sizeptr = size;
148 	} else {
149 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE6,
150 		    krdc->rpc_version, xdr_int, (char *)&remote_index,
151 		    xdr_u_longlong_t, (char *)sizeptr, &t);
152 	}
153 	return (err);
154 }
155 
156 
157 int
158 rdc_net_state(int index, int options)
159 {
160 	struct timeval t;
161 	int err;
162 	int remote_index = -1;
163 	rdc_u_info_t *urdc = &rdc_u_info[index];
164 	rdc_k_info_t *krdc = &rdc_k_info[index];
165 	struct set_state s;
166 	struct set_state4 s4;
167 	char neta[32], rneta[32];
168 	unsigned short *sp;
169 
170 	t.tv_sec = rdc_rpc_tmout;
171 	t.tv_usec = 0;
172 
173 	if (krdc->rpc_version < RDC_VERSION7) {
174 		s4.netaddrlen = urdc->primary.addr.len;
175 		s4.rnetaddrlen = urdc->secondary.addr.len;
176 		bcopy(urdc->primary.addr.buf, s4.netaddr, s4.netaddrlen);
177 		bcopy(urdc->secondary.addr.buf, s4.rnetaddr, s4.rnetaddrlen);
178 		(void) strncpy(s4.pfile, urdc->primary.file, RDC_MAXNAMLEN);
179 		(void) strncpy(s4.sfile, urdc->secondary.file, RDC_MAXNAMLEN);
180 		s4.flag = options;
181 
182 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE,
183 		    krdc->rpc_version, xdr_set_state4, (char *)&s4, xdr_int,
184 		    (char *)&remote_index, &t);
185 	} else {
186 		s.netaddrlen = urdc->primary.addr.len;
187 		s.rnetaddrlen = urdc->secondary.addr.len;
188 		s.netaddr.buf = neta;
189 		s.rnetaddr.buf = rneta;
190 		bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen);
191 		bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen);
192 		s.netaddr.len = urdc->primary.addr.len;
193 		s.rnetaddr.len = urdc->secondary.addr.len;
194 		s.netaddr.maxlen = urdc->primary.addr.len;
195 		s.rnetaddr.maxlen = urdc->secondary.addr.len;
196 		sp = (unsigned short *)s.netaddr.buf;
197 		*sp = htons(*sp);
198 		sp = (unsigned short *)s.rnetaddr.buf;
199 		*sp = htons(*sp);
200 		s.pfile = urdc->primary.file;
201 		s.sfile = urdc->secondary.file;
202 		s.flag = options;
203 
204 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE,
205 		    krdc->rpc_version, xdr_set_state, (char *)&s, xdr_int,
206 		    (char *)&remote_index, &t);
207 	}
208 
209 	if (err)
210 		return (-1);
211 	else
212 		return (remote_index);
213 }
214 
215 
216 /*
217  * rdc_net_getbmap
218  * gets the bitmaps from remote side and or's them  with remote bitmap
219  */
220 int
221 rdc_net_getbmap(int index, int size)
222 {
223 	struct timeval t;
224 	int err;
225 	struct bmap b;
226 	struct bmap6 b6;
227 	rdc_k_info_t *krdc;
228 
229 	krdc = &rdc_k_info[index];
230 
231 	if (krdc->remote_index < 0)
232 		return (EINVAL);
233 
234 	t.tv_sec = rdc_rpc_tmout;
235 	t.tv_usec = 0;
236 #ifdef DEBUG
237 	if (krdc->intf == NULL)
238 		cmn_err(CE_WARN,
239 		    "!rdc_net_getbmap: null intf for index %d", index);
240 #endif
241 
242 	if (krdc->rpc_version <= RDC_VERSION5) {
243 		b.cd = krdc->remote_index;
244 		b.dual = index;
245 		b.size = size;
246 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP,
247 		    krdc->rpc_version, xdr_bmap, (char *)&b, xdr_int,
248 		    (char *)&err, &t);
249 
250 	} else {
251 		b6.cd = krdc->remote_index;
252 		b6.dual = index;
253 		b6.size = size;
254 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP6,
255 		    krdc->rpc_version, xdr_bmap6, (char *)&b6, xdr_int,
256 		    (char *)&err, &t);
257 	}
258 	return (err);
259 }
260 
261 int sndr_proto = 0;
262 
263 /*
264  * return state corresponding to rdc_host
265  */
266 int
267 rdc_net_getstate(rdc_k_info_t *krdc, int *serial_mode, int *use_mirror,
268     int *mirror_down, int network)
269 {
270 	int err;
271 	struct timeval t;
272 	int state;
273 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
274 	struct set_state s;
275 #ifdef sparc
276 	struct set_state4 s4;
277 #endif
278 	char neta[32];
279 	char rneta[32];
280 	unsigned short *sp;
281 	char *setp = (char *)&s;
282 	xdrproc_t xdr_proc = xdr_set_state;
283 
284 	if (krdc->lsrv && (krdc->intf == NULL || krdc->intf->if_down) &&
285 	    network) /* fail fast */
286 		return (-1);
287 
288 	s.netaddrlen = urdc->primary.addr.len;
289 	s.rnetaddrlen = urdc->secondary.addr.len;
290 	s.pfile = urdc->primary.file;
291 	s.sfile = urdc->secondary.file;
292 	s.netaddr.buf = neta;
293 	s.rnetaddr.buf = rneta;
294 	bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen);
295 	bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen);
296 	sp = (unsigned short *) s.netaddr.buf;
297 	*sp = htons(*sp);
298 	sp = (unsigned short *) s.rnetaddr.buf;
299 	*sp = htons(*sp);
300 	s.netaddr.len = urdc->primary.addr.len;
301 	s.rnetaddr.len = urdc->secondary.addr.len;
302 	s.netaddr.maxlen = urdc->primary.addr.maxlen;
303 	s.rnetaddr.maxlen = urdc->secondary.addr.maxlen;
304 	s.flag = 0;
305 
306 	t.tv_sec = rdc_rpc_tmout;
307 	t.tv_usec = 0;
308 
309 	if (sndr_proto)
310 		krdc->rpc_version = sndr_proto;
311 	else
312 		krdc->rpc_version = RDC_VERS_MAX;
313 
314 again:
315 	err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSTATE4, krdc->rpc_version,
316 	    xdr_proc, setp, xdr_int, (char *)&state, &t);
317 
318 	if (err == RPC_PROGVERSMISMATCH && (krdc->rpc_version !=
319 	    RDC_VERS_MIN)) {
320 		if (krdc->rpc_version-- == RDC_VERSION7) {
321 			/* set_state struct changed with v7 of protocol */
322 #ifdef sparc
323 			s4.netaddrlen = urdc->primary.addr.len;
324 			s4.rnetaddrlen = urdc->secondary.addr.len;
325 			bcopy(urdc->primary.addr.buf, s4.netaddr,
326 			    s4.netaddrlen);
327 			bcopy(urdc->secondary.addr.buf, s4.rnetaddr,
328 			    s4.rnetaddrlen);
329 			(void) strncpy(s4.pfile, urdc->primary.file,
330 			    RDC_MAXNAMLEN);
331 			(void) strncpy(s4.sfile, urdc->secondary.file,
332 			    RDC_MAXNAMLEN);
333 			s4.flag = 0;
334 			xdr_proc = xdr_set_state4;
335 			setp = (char *)&s4;
336 #else
337 			/* x64 can not use protocols < 7 */
338 			return (-1);
339 #endif
340 		}
341 		goto again;
342 	}
343 #ifdef DEBUG
344 	cmn_err(CE_NOTE, "!sndr get_state: Protocol ver %d", krdc->rpc_version);
345 #endif
346 
347 	if (err) {
348 		return (-1);
349 	}
350 
351 	if (state == -1)
352 		return (-1);
353 
354 	if (serial_mode)
355 		*serial_mode = (state >> 2) & 1;
356 	if (use_mirror)
357 		*use_mirror = (state >> 1) & 1;
358 	if (mirror_down)
359 		*mirror_down = state & 1;
360 
361 	return (0);
362 }
363 
364 
365 static struct xdr_discrim rdres_discrim[2] = {
366 	{ (int)RDC_OK, xdr_readok },
367 	{ __dontcare__, NULL_xdrproc_t }
368 };
369 
370 
371 /*
372  * Reply from remote read (client side)
373  */
374 static bool_t
375 xdr_rdresult(XDR *xdrs, readres *rr)
376 {
377 
378 	return (xdr_union(xdrs, (enum_t *)&(rr->rr_status),
379 	    (caddr_t)&(rr->rr_ok), rdres_discrim, xdr_void));
380 }
381 
382 static int
383 rdc_rrstatus_decode(int status)
384 {
385 	int ret = 0;
386 
387 	if (status != RDC_OK) {
388 		switch (status) {
389 		case RDCERR_NOENT:
390 			ret = ENOENT;
391 			break;
392 		case RDCERR_NOMEM:
393 			ret = ENOMEM;
394 			break;
395 		default:
396 			ret = EIO;
397 			break;
398 		}
399 	}
400 
401 	return (ret);
402 }
403 
404 
405 int
406 rdc_net_read(int local_index, int remote_index, nsc_buf_t *handle,
407     nsc_off_t fba_pos, nsc_size_t fba_len)
408 {
409 	struct rdcrdresult rr;
410 	rdc_k_info_t *krdc;
411 	rdc_u_info_t *urdc;
412 	struct rread list;
413 	struct rread6 list6;
414 	struct timeval t;
415 	uchar_t *sv_addr;
416 	nsc_vec_t *vec;
417 	int rpc_flag;
418 	nsc_size_t sv_len;
419 	int err;
420 	int ret;
421 	nsc_size_t len;
422 	nsc_size_t maxfbas;
423 	int transflag;
424 
425 	if (handle == NULL)
426 		return (EINVAL);
427 
428 	if (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len)) {
429 #ifdef DEBUG
430 		cmn_err(CE_NOTE, "!rdc_net_read: handle bounds");
431 #endif
432 		return (EINVAL);
433 	}
434 
435 	krdc = &rdc_k_info[local_index];
436 	urdc = &rdc_u_info[local_index];
437 
438 	maxfbas = MAX_RDC_FBAS;
439 
440 	if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) {
441 		nsc_buf_t *remote_h = NULL;
442 		int reserved = 0;
443 
444 		ret = nsc_reserve(krdc->remote_fd, NSC_MULTI);
445 		if (RDC_SUCCESS(ret)) {
446 			reserved = 1;
447 			ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len,
448 			    NSC_RDBUF, &remote_h);
449 		}
450 		if (RDC_SUCCESS(ret)) {
451 			ret = nsc_copy(remote_h, handle, fba_pos, fba_pos,
452 			    fba_len);
453 			if (RDC_SUCCESS(ret)) {
454 				(void) nsc_free_buf(remote_h);
455 				nsc_release(krdc->remote_fd);
456 				return (0);
457 			}
458 		}
459 		rdc_group_enter(krdc);
460 		rdc_set_flags(urdc, RDC_FCAL_FAILED);
461 		rdc_group_exit(krdc);
462 		if (remote_h)
463 			(void) nsc_free_buf(remote_h);
464 		if (reserved)
465 			nsc_release(krdc->remote_fd);
466 	}
467 
468 	t.tv_sec = rdc_rpc_tmout;
469 	t.tv_usec = 0;
470 
471 	if (rdc_get_vflags(urdc) & RDC_VOL_FAILED)
472 		rpc_flag = RDC_RREAD_FAIL;
473 	else
474 		rpc_flag = 0;
475 
476 #ifdef DEBUG
477 	if (krdc->intf == NULL)
478 		cmn_err(CE_WARN,
479 		    "!rdc_net_read: null intf for index %d", local_index);
480 #endif
481 	/*
482 	 * switch on proto version.
483 	 */
484 	len = fba_len;		/* length (FBAs) still to xfer */
485 	rr.rr_bufsize = 0;	/* rpc data buffer length (bytes) */
486 	rr.rr_data = NULL;	/* rpc data buffer */
487 	transflag = rpc_flag | RDC_RREAD_START;	/* setup rpc */
488 	if (krdc->rpc_version <= RDC_VERSION5) {
489 		ASSERT(fba_pos <= INT32_MAX);
490 		list.pos = (int)fba_pos; /* fba position of start of chunk */
491 		list.cd = remote_index;	/* remote end cd */
492 		/* send setup rpc */
493 		list.flag = transflag;
494 		ASSERT(len <= INT32_MAX);
495 		list.len = (int)len;			/* total fba length */
496 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
497 		    krdc->rpc_version, xdr_rread, (char *)&list, xdr_int,
498 		    (char *)&ret, &t);
499 
500 	} else {
501 		list6.pos = fba_pos;	/* fba position of start of chunk */
502 		list6.cd = remote_index;	/* remote end cd */
503 		/* send setup rpc */
504 		list6.flag = transflag;	/* setup rpc */
505 		ASSERT(len <= INT32_MAX);
506 		list6.len = (int)len;			/* total fba length */
507 		err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
508 		    krdc->rpc_version, xdr_rread6, (char *)&list6, xdr_int,
509 		    (char *)&ret, &t);
510 	}
511 
512 	if (err) {
513 #ifdef DEBUG
514 		cmn_err(CE_NOTE, "!rdc_net_read: setup err %d", err);
515 #endif
516 		if (err == RPC_INTR)
517 			ret = EINTR;
518 		else
519 			ret = ENOLINK;
520 
521 		goto remote_rerror;
522 	}
523 
524 	if (ret == 0) {		/* No valid index from r_net_read */
525 #ifdef DEBUG
526 		cmn_err(CE_NOTE,
527 		    "!rdc_net_read: no valid index from r_net_read");
528 #endif
529 		return (ENOBUFS);
530 	}
531 	transflag = rpc_flag | RDC_RREAD_DATA;
532 	if (krdc->rpc_version <= RDC_VERSION5) {
533 		list.idx = ret;		/* save idx to return to server */
534 		list.flag = transflag;
535 					/* move onto to data xfer rpcs */
536 	} else {
537 		list6.idx = ret;	/* save idx to return to server */
538 		list6.flag = transflag;
539 	}
540 
541 	/* find starting position in handle */
542 
543 	vec = handle->sb_vec;
544 
545 	fba_pos -= handle->sb_pos;
546 
547 	for (; fba_pos >= FBA_NUM(vec->sv_len); vec++)
548 		fba_pos -= FBA_NUM(vec->sv_len);
549 
550 	sv_addr = vec->sv_addr + FBA_SIZE(fba_pos);	/* data in vector */
551 	sv_len = vec->sv_len - FBA_SIZE(fba_pos);	/* bytes in vector */
552 
553 	while (len) {
554 		nsc_size_t translen;
555 		if (len > maxfbas) {
556 			translen = maxfbas;
557 		} else {
558 			translen = len;
559 		}
560 
561 		if (FBA_SIZE(translen) > sv_len) {
562 			translen = FBA_NUM(sv_len);
563 		}
564 
565 		len -= translen;
566 		if (len == 0) {
567 			/* last data xfer rpc - tell server to cleanup */
568 			transflag |= RDC_RREAD_END;
569 		}
570 
571 		if (!rr.rr_data || (nsc_size_t)rr.rr_bufsize !=
572 		    FBA_SIZE(translen)) {
573 			if (rr.rr_data)
574 				kmem_free(rr.rr_data, rr.rr_bufsize);
575 
576 			ASSERT(FBA_SIZE(translen) <= INT32_MAX);
577 			rr.rr_bufsize = FBA_SIZE(translen);
578 			rr.rr_data = kmem_alloc(rr.rr_bufsize, KM_NOSLEEP);
579 		}
580 
581 		if (!rr.rr_data) {
582 			/* error */
583 #ifdef DEBUG
584 			cmn_err(CE_NOTE, "!rdc_net_read: kmem_alloc failed");
585 #endif
586 			return (ENOMEM);
587 		}
588 
589 		/* get data from remote end */
590 
591 #ifdef DEBUG
592 		if (krdc->intf == NULL)
593 			cmn_err(CE_WARN,
594 			    "!rdc_net_read: null intf for index %d",
595 			    local_index);
596 #endif
597 		if (krdc->io_kstats) {
598 			mutex_enter(krdc->io_kstats->ks_lock);
599 			kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
600 			mutex_exit(krdc->io_kstats->ks_lock);
601 		}
602 		/*CONSTCOND*/
603 		ASSERT(RDC_MAXDATA <= INT32_MAX);
604 		ASSERT(translen <= RDC_MAXDATA);
605 		if (krdc->rpc_version <= RDC_VERSION5) {
606 			list.len = (int)translen;
607 			list.flag = transflag;
608 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
609 			    krdc->rpc_version, xdr_rread, (char *)&list,
610 			    xdr_rdresult, (char *)&rr, &t);
611 		} else {
612 			list6.len = (int)translen;
613 			list6.flag = transflag;
614 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
615 			    krdc->rpc_version, xdr_rread6, (char *)&list6,
616 			    xdr_rdresult, (char *)&rr, &t);
617 		}
618 
619 		if (krdc->io_kstats) {
620 			mutex_enter(krdc->io_kstats->ks_lock);
621 			kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
622 			mutex_exit(krdc->io_kstats->ks_lock);
623 		}
624 
625 		if (err) {
626 #ifdef DEBUG
627 			cmn_err(CE_NOTE, "!rdc_net_read: rpc err %d", err);
628 #endif
629 			if (err == RPC_INTR) {
630 				ret = EINTR;
631 			} else {
632 				ret = ENOLINK;
633 			}
634 
635 			goto remote_rerror;
636 		}
637 
638 		if (rr.rr_status != RDC_OK) {
639 			ret = rdc_rrstatus_decode(rr.rr_status);
640 			if (!ret)
641 				ret = EIO;
642 
643 			goto remote_rerror;
644 		}
645 
646 		/* copy into handle */
647 
648 		bcopy(rr.rr_data, sv_addr, (size_t)rr.rr_bufsize);
649 
650 		/* update counters */
651 
652 		sv_addr += rr.rr_bufsize;
653 		if (krdc->rpc_version <= RDC_VERSION5) {
654 			list.pos += translen;
655 		} else {
656 			list6.pos += translen;
657 		}
658 		if (krdc->io_kstats) {
659 			KSTAT_IO_PTR(krdc->io_kstats)->reads++;
660 			KSTAT_IO_PTR(krdc->io_kstats)->nread += rr.rr_bufsize;
661 		}
662 		ASSERT(sv_len <= INT32_MAX);
663 		ASSERT(sv_len >= (nsc_size_t)rr.rr_bufsize);
664 		sv_len -= rr.rr_bufsize;
665 
666 		if (sv_len == 0) {
667 			/* goto next vector */
668 			vec++;
669 			sv_addr = vec->sv_addr;
670 			sv_len = vec->sv_len;
671 		}
672 	}
673 
674 	if (rr.rr_data)
675 		kmem_free(rr.rr_data, rr.rr_bufsize);
676 
677 	return (0);
678 
679 remote_rerror:
680 	if (rr.rr_data)
681 		kmem_free(rr.rr_data, rr.rr_bufsize);
682 
683 	return (ret ? ret : ENOLINK);
684 }
685 
686 /*
687  * rdc_net_write
688  * Main remote write client side
689  * Handles protocol selection as well as requests for remote allocation
690  * and data transfer
691  * Does local IO for FCAL
692  * caller must clear bitmap on success
693  */
694 
695 int
696 rdc_net_write(int local_index, int remote_index, nsc_buf_t *handle,
697     nsc_off_t fba_pos, nsc_size_t fba_len, uint_t aseq, int qpos,
698     netwriteres *netres)
699 {
700 	rdc_k_info_t *krdc;
701 	rdc_u_info_t *urdc;
702 	struct timeval t;
703 	nsc_vec_t *vec;
704 	int sv_len;
705 	nsc_off_t fpos;
706 	int err;
707 	struct netwriteres netret;
708 	struct netwriteres *netresptr;
709 	struct net_data5 dlist5;
710 	struct net_data6 dlist6;
711 	int ret;
712 	nsc_size_t maxfbas;
713 	int transflag;
714 	int translen;
715 	int transendoblk;
716 	char *transptr;
717 	int vflags;
718 
719 	if (handle == NULL)
720 		return (EINVAL);
721 
722 	/* if not a diskq buffer */
723 	if ((qpos == -1) && (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len))) {
724 #ifdef DEBUG
725 		cmn_err(CE_NOTE, "!rdc_net_write: handle bounds");
726 #endif
727 		return (EINVAL);
728 	}
729 
730 
731 	t.tv_sec = rdc_rpc_tmout;
732 	t.tv_usec = 0;
733 
734 	krdc = &rdc_k_info[local_index];
735 	urdc = &rdc_u_info[local_index];
736 
737 	maxfbas = MAX_RDC_FBAS;
738 
739 	/* FCAL IO */
740 	if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) {
741 		nsc_buf_t *remote_h = NULL;
742 		int reserved = 0;
743 
744 		ret = nsc_reserve(krdc->remote_fd, NSC_MULTI);
745 		if (RDC_SUCCESS(ret)) {
746 			reserved = 1;
747 			ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len,
748 			    NSC_WRBUF, &remote_h);
749 		}
750 		if (RDC_SUCCESS(ret)) {
751 			ret = nsc_copy(handle, remote_h, fba_pos, fba_pos,
752 			    fba_len);
753 			if (RDC_SUCCESS(ret))
754 				ret = nsc_write(remote_h, fba_pos, fba_len, 0);
755 			if (RDC_SUCCESS(ret)) {
756 				(void) nsc_free_buf(remote_h);
757 				nsc_release(krdc->remote_fd);
758 				return (0);
759 			}
760 		}
761 		rdc_group_enter(krdc);
762 		rdc_set_flags(urdc, RDC_FCAL_FAILED);
763 		rdc_group_exit(krdc);
764 		if (remote_h)
765 			(void) nsc_free_buf(remote_h);
766 		if (reserved)
767 			nsc_release(krdc->remote_fd);
768 	}
769 
770 	/*
771 	 * At this point we must decide which protocol we are using and
772 	 * do the right thing
773 	 */
774 	netret.vecdata.vecdata_val = NULL;
775 	netret.vecdata.vecdata_len = 0;
776 	if (netres) {
777 		netresptr = netres;
778 	} else {
779 		netresptr = &netret;
780 	}
781 
782 	vflags = rdc_get_vflags(urdc);
783 
784 	if (vflags & (RDC_VOL_FAILED|RDC_BMP_FAILED))
785 		transflag = RDC_RWRITE_FAIL;
786 	else
787 		transflag = 0;
788 
789 
790 #ifdef DEBUG
791 	if (krdc->intf == NULL)
792 		cmn_err(CE_WARN, "!rdc_net_write: null intf for index %d",
793 		    local_index);
794 #endif
795 
796 	vec = handle->sb_vec;
797 
798 	/*
799 	 * find starting position in vector
800 	 */
801 	if ((qpos == -1) || (handle->sb_user == RDC_NULLBUFREAD))
802 		fpos = fba_pos - handle->sb_pos;
803 	else
804 		fpos = (qpos + 1) - handle->sb_pos;
805 
806 	for (; fpos >= FBA_NUM(vec->sv_len); vec++)
807 		fpos -= FBA_NUM(vec->sv_len);
808 	sv_len = vec->sv_len - FBA_SIZE(fpos);	/* bytes in vector */
809 	transptr = (char *)vec->sv_addr + FBA_SIZE(fpos);
810 
811 	if (krdc->rpc_version <= RDC_VERSION5) {
812 		dlist5.local_cd = local_index;
813 		dlist5.cd = remote_index;
814 		ASSERT(fba_len <= INT32_MAX);
815 		ASSERT(fba_pos <= INT32_MAX);
816 		dlist5.len = (int)fba_len;
817 		dlist5.pos = (int)fba_pos;
818 		dlist5.idx = -1; /* Starting index */
819 		dlist5.flag = transflag;
820 		dlist5.seq = aseq;		/* sequence number */
821 		dlist5.sfba = (int)fba_pos;	/* starting fba for this xfer */
822 	} else {
823 		dlist6.local_cd = local_index;
824 		dlist6.cd = remote_index;
825 		ASSERT(fba_len <= INT32_MAX);
826 		dlist6.len = (int)fba_len;
827 		dlist6.qpos = qpos;
828 		dlist6.pos = fba_pos;
829 		dlist6.idx = -1; /* Starting index */
830 		dlist6.flag = transflag;
831 		dlist6.seq = aseq;		/* sequence number */
832 		dlist6.sfba = fba_pos;		/* starting fba for this xfer */
833 	}
834 
835 	transendoblk = 0;
836 	while (fba_len) {
837 		if (!transptr) {
838 #ifdef DEBUG
839 			cmn_err(CE_WARN,
840 			    "!rdc_net_write: walked off end of handle!");
841 #endif
842 			ret = EINVAL;
843 			goto remote_error;
844 		}
845 
846 		if (fba_len > maxfbas) {
847 			ASSERT(maxfbas <= INT32_MAX);
848 			translen = (int)maxfbas;
849 		} else {
850 			ASSERT(fba_len <= INT32_MAX);
851 			translen = (int)fba_len;
852 		}
853 
854 		if (FBA_SIZE(translen) > sv_len) {
855 			translen = FBA_NUM(sv_len);
856 		}
857 
858 		fba_len -= translen;
859 		if (fba_len == 0) {
860 			/* last data xfer - tell server to commit */
861 			transendoblk = 1;
862 		}
863 
864 
865 #ifdef DEBUG
866 		if (krdc->intf == NULL)
867 			cmn_err(CE_WARN,
868 			    "!rdc_net_write: null intf for index %d",
869 			    local_index);
870 #endif
871 		DTRACE_PROBE(rdc_netwrite_clntcall_start);
872 
873 		if (krdc->io_kstats) {
874 			mutex_enter(krdc->io_kstats->ks_lock);
875 			kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
876 			mutex_exit(krdc->io_kstats->ks_lock);
877 		}
878 		if (krdc->rpc_version <= RDC_VERSION5) {
879 			ret = 0;
880 			dlist5.nfba = translen;
881 			dlist5.endoblk = transendoblk;
882 			dlist5.data.data_len = FBA_SIZE(translen);
883 			dlist5.data.data_val = transptr;
884 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE5,
885 			    krdc->rpc_version, xdr_net_data5,
886 			    (char *)&dlist5, xdr_int,
887 			    (char *)&ret, &t);
888 			if (ret >= 0) {
889 				netresptr->result = 0;
890 				netresptr->index = ret;
891 			} else {
892 				netresptr->result = ret;
893 			}
894 		} else {
895 			netresptr->result = 0;
896 			dlist6.nfba = translen;
897 			dlist6.endoblk = transendoblk;
898 			dlist6.data.data_len = FBA_SIZE(translen);
899 			dlist6.data.data_val = transptr;
900 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6,
901 			    krdc->rpc_version, xdr_net_data6,
902 			    (char *)&dlist6, xdr_netwriteres,
903 			    (char *)netresptr, &t);
904 		}
905 
906 		if (krdc->io_kstats) {
907 			mutex_enter(krdc->io_kstats->ks_lock);
908 			kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
909 			mutex_exit(krdc->io_kstats->ks_lock);
910 		}
911 
912 		DTRACE_PROBE(rdc_netwrite_clntcall_end);
913 		ret = netresptr->result;
914 		if (err) {
915 			if (err == RPC_INTR)
916 				ret = EINTR;
917 			else if (err && ret != EPROTO)
918 				ret = ENOLINK;
919 #ifdef DEBUG
920 			cmn_err(CE_NOTE,
921 			    "!rdc_net_write(5): cd %d err %d ret %d",
922 			    remote_index, err, ret);
923 #endif
924 			goto remote_error;
925 		}
926 		/* Error from r_net_write5 */
927 		if (netresptr->result < 0) {
928 #ifdef DEBUG
929 			cmn_err(CE_NOTE,
930 			    "!rdc_net_write: r_net_write(5) "
931 			    "returned: %d",
932 			    -netresptr->result);
933 #endif
934 			ret = -netresptr->result;
935 			if (netret.vecdata.vecdata_val)
936 				kmem_free(netret.vecdata.vecdata_val,
937 				    netret.vecdata.vecdata_len *
938 				    sizeof (net_pendvec_t));
939 			goto remote_error;
940 		} else if (netresptr->index == 0) {
941 #ifdef DEBUG
942 			cmn_err(CE_NOTE,
943 			    "!rdc_net_write: no valid index from "
944 			    "r_net_write(5)");
945 #endif
946 			ret = ENOBUFS;
947 			if (netret.vecdata.vecdata_val)
948 				kmem_free(netret.vecdata.vecdata_val,
949 				    netret.vecdata.vecdata_len *
950 				    sizeof (net_pendvec_t));
951 			goto remote_error;
952 		}
953 		if (krdc->rpc_version <= RDC_VERSION5) {
954 			dlist5.idx = netresptr->index;
955 			dlist5.sfba += dlist5.nfba;
956 		} else {
957 			dlist6.idx = netresptr->index;
958 			dlist6.sfba += dlist6.nfba;
959 		}
960 		/* update counters */
961 		if (krdc->io_kstats) {
962 			KSTAT_IO_PTR(krdc->io_kstats)->writes++;
963 			KSTAT_IO_PTR(krdc->io_kstats)->nwritten +=
964 			    FBA_SIZE(translen);
965 		}
966 		transptr += FBA_SIZE(translen);
967 		sv_len -= FBA_SIZE(translen);
968 
969 		if (sv_len <= 0) {
970 			/* goto next vector */
971 			vec++;
972 			transptr = (char *)vec->sv_addr;
973 			sv_len = vec->sv_len;
974 		}
975 	}
976 	/*
977 	 * this can't happen.....
978 	 */
979 	if (netret.vecdata.vecdata_val)
980 		kmem_free(netret.vecdata.vecdata_val,
981 		    netret.vecdata.vecdata_len *
982 		    sizeof (net_pendvec_t));
983 
984 	return (0);
985 
986 remote_error:
987 	return (ret ? ret : ENOLINK);
988 }
989 
990 void
991 rdc_fixlen(rdc_aio_t *aio)
992 {
993 	nsc_vec_t *vecp = aio->qhandle->sb_vec;
994 	nsc_size_t len = 0;
995 
996 	while (vecp->sv_addr) {
997 		len += FBA_NUM(vecp->sv_len);
998 		vecp++;
999 	}
1000 	aio->qhandle->sb_len = len;
1001 }
1002 
1003 /*
1004  * rdc_dump_alloc_bufs_cd
1005  * Dump allocated buffers (rdc_net_hnd's) for the specified cd.
1006  * this could be the flusher failing, if so, don't do the delay forever
1007  * Returns: 0 (success), EAGAIN (caller needs to try again).
1008  */
1009 int
1010 rdc_dump_alloc_bufs_cd(int index)
1011 {
1012 	rdc_k_info_t *krdc;
1013 	rdc_aio_t *aio;
1014 	net_queue *q;
1015 	disk_queue *dq;
1016 	kmutex_t *qlock;
1017 
1018 	krdc = &rdc_k_info[index];
1019 
1020 
1021 	if (!krdc->c_fd) {
1022 		/* cannot do anything! */
1023 #ifdef DEBUG
1024 		cmn_err(CE_WARN, "!rdc_dump_alloc_bufs_cd(%d): c_fd NULL",
1025 		    index);
1026 #endif
1027 		return (0);
1028 	}
1029 	rdc_dump_dsets(index);
1030 
1031 	dq = &krdc->group->diskq;
1032 
1033 	if (RDC_IS_DISKQ(krdc->group)) {
1034 		qlock = QLOCK(dq);
1035 		(void) _rdc_rsrv_diskq(krdc->group);
1036 	} else {
1037 		qlock = &krdc->group->ra_queue.net_qlock;
1038 	}
1039 
1040 	/*
1041 	 * Now dump the async queue anonymous buffers
1042 	 * if we are a diskq, the we are using the diskq mutex.
1043 	 * However, we are flushing from diskq to memory queue
1044 	 * so we now need to grab the memory lock also
1045 	 */
1046 
1047 	q = &krdc->group->ra_queue;
1048 
1049 	if (RDC_IS_DISKQ(krdc->group)) {
1050 		mutex_enter(&q->net_qlock);
1051 		if (q->qfill_sleeping == RDC_QFILL_AWAKE) {
1052 			int tries = 5;
1053 #ifdef DEBUG_DISKQ
1054 			cmn_err(CE_NOTE,
1055 			    "!dumpalloccd sending diskq->memq flush to sleep");
1056 #endif
1057 			q->qfflags |= RDC_QFILLSLEEP;
1058 			mutex_exit(&q->net_qlock);
1059 
1060 			while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--)
1061 				delay(5);
1062 			mutex_enter(&q->net_qlock);
1063 		}
1064 	}
1065 
1066 	mutex_enter(qlock);
1067 
1068 	while ((q->net_qhead != NULL)) {
1069 		rdc_k_info_t *tmpkrdc;
1070 		aio = q->net_qhead;
1071 		tmpkrdc = &rdc_k_info[aio->index];
1072 
1073 		if (RDC_IS_DISKQ(krdc->group)) {
1074 			aio->qhandle->sb_user--;
1075 			if (aio->qhandle->sb_user == 0) {
1076 				rdc_fixlen(aio);
1077 				(void) nsc_free_buf(aio->qhandle);
1078 				aio->qhandle = NULL;
1079 				aio->handle = NULL;
1080 			}
1081 		} else {
1082 			if (aio->handle) {
1083 				(void) nsc_free_buf(aio->handle);
1084 				aio->handle = NULL;
1085 			}
1086 		}
1087 
1088 		if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(krdc->group)) {
1089 			mutex_enter(tmpkrdc->io_kstats->ks_lock);
1090 			kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats));
1091 			mutex_exit(tmpkrdc->io_kstats->ks_lock);
1092 		}
1093 		q->net_qhead = q->net_qhead->next;
1094 		q->blocks -= aio->len;
1095 		q->nitems--;
1096 
1097 		RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len);
1098 
1099 		kmem_free(aio, sizeof (*aio));
1100 	}
1101 	q->net_qtail = NULL;
1102 
1103 	if (krdc->group->asyncstall) {
1104 		krdc->group->asyncdis = 1;
1105 		cv_broadcast(&krdc->group->asyncqcv);
1106 	}
1107 	if (krdc->group->sleepq) {
1108 		rdc_sleepqdiscard(krdc->group);
1109 	}
1110 
1111 	krdc->group->seq = RDC_NEWSEQ;
1112 	krdc->group->seqack = RDC_NEWSEQ;
1113 	if (RDC_IS_DISKQ(krdc->group)) {
1114 		rdc_dump_iohdrs(dq);
1115 		SET_QNXTIO(dq, QHEAD(dq));
1116 		SET_QCOALBOUNDS(dq, QHEAD(dq));
1117 	}
1118 	mutex_exit(qlock);
1119 
1120 	if (RDC_IS_DISKQ(krdc->group)) {
1121 		mutex_exit(&q->net_qlock);
1122 		_rdc_rlse_diskq(krdc->group);
1123 	}
1124 
1125 	return (0);
1126 }
1127 
1128 
1129 /*
1130  * rdc_dump_alloc_bufs
1131  * We have an error on the link
1132  * Try to dump all of the allocated bufs so we can cleanly recover
1133  * and not hang
1134  */
1135 void
1136 rdc_dump_alloc_bufs(rdc_if_t *ip)
1137 {
1138 	rdc_k_info_t *krdc;
1139 	int repeat;
1140 	int index;
1141 
1142 	for (index = 0; index < rdc_max_sets; index++) {
1143 		do {
1144 			krdc = &rdc_k_info[index];
1145 			repeat = 0;
1146 			if (krdc->intf == ip) {
1147 				if (rdc_dump_alloc_bufs_cd(index) == EAGAIN) {
1148 					repeat = 1;
1149 					delay(2);
1150 				}
1151 			}
1152 		} while (repeat);
1153 	}
1154 }
1155 
1156 /*
1157  * returns 1 if the the throttle should throttle, 0 if not.
1158  */
1159 int
1160 _rdc_diskq_isfull(disk_queue *q, long len)
1161 {
1162 	/* ---T----H----N--- */
1163 	mutex_enter(QLOCK(q));
1164 
1165 	if (FITSONQ(q, len + 1)) {
1166 		mutex_exit(QLOCK(q));
1167 		return (0);
1168 	}
1169 	mutex_exit(QLOCK(q));
1170 	return (1);
1171 }
1172 
1173 void
1174 _rdc_async_throttle(rdc_k_info_t *this, long len)
1175 {
1176 	rdc_k_info_t *krdc;
1177 	rdc_u_info_t *urdc;
1178 	int print_msg = 1;
1179 	int tries = RDC_FUTILE_ATTEMPTS;
1180 
1181 	/*
1182 	 * Throttle entries on queue
1183 	 */
1184 
1185 	/* Need to take the 1-many case into account, checking all sets */
1186 
1187 	/* ADD HANDY HUERISTIC HERE TO SLOW DOWN IO */
1188 	for (krdc = this; /* CSTYLED */; krdc = krdc->many_next) {
1189 		urdc = &rdc_u_info[krdc->index];
1190 
1191 		/*
1192 		 * this may be the last set standing in a one to many setup.
1193 		 * we may also be stuck in unintercept, after marking
1194 		 * the volume as not enabled, but have not removed it
1195 		 * from the many list resulting in an endless loop if
1196 		 * we just continue here. Lets jump over this stuff
1197 		 * and check to see if we are the only dude here.
1198 		 */
1199 		if (!IS_ENABLED(urdc))
1200 			goto thischeck;
1201 
1202 		if (IS_ASYNC(urdc) && RDC_IS_MEMQ(krdc->group)) {
1203 			net_queue *q = &krdc->group->ra_queue;
1204 			while ((q->blocks + q->inflbls) > urdc->maxqfbas ||
1205 			    (q->nitems + q->inflitems) > urdc->maxqitems) {
1206 
1207 				if (!IS_ENABLED(urdc)) /* disable race */
1208 					goto thischeck;
1209 
1210 				if (!krdc->group->rdc_writer)
1211 					(void) rdc_writer(krdc->index);
1212 				delay(2);
1213 				q->throttle_delay++;
1214 			}
1215 		}
1216 
1217 		/* do a much more aggressive delay, get disk flush going */
1218 		if (IS_ASYNC(urdc) && RDC_IS_DISKQ(krdc->group)) {
1219 			disk_queue *q = &krdc->group->diskq;
1220 			while ((!IS_QSTATE(q, RDC_QNOBLOCK)) &&
1221 			    (_rdc_diskq_isfull(q, len)) &&
1222 			    (!IS_STATE(urdc, RDC_DISKQ_FAILED))) {
1223 				if (print_msg) {
1224 					cmn_err(CE_WARN, "!rdc async throttle:"
1225 					    " disk queue %s full",
1226 					    &urdc->disk_queue[0]);
1227 
1228 					print_msg = 0;
1229 				}
1230 				if (!IS_ENABLED(urdc)) /* disable race */
1231 					goto thischeck;
1232 
1233 				if (!krdc->group->rdc_writer)
1234 					(void) rdc_writer(krdc->index);
1235 				delay(10);
1236 				q->throttle_delay += 10;
1237 
1238 				if (!(tries--) && IS_STATE(urdc, RDC_QUEUING)) {
1239 					cmn_err(CE_WARN, "!SNDR: disk queue "
1240 					    "%s full & not flushing. giving up",
1241 					    &urdc->disk_queue[0]);
1242 					cmn_err(CE_WARN, "!SNDR: %s:%s entering"
1243 					    " logging mode",
1244 					    urdc->secondary.intf,
1245 					    urdc->secondary.file);
1246 					rdc_fail_diskq(krdc, RDC_WAIT,
1247 					    RDC_DOLOG | RDC_NOFAIL);
1248 					mutex_enter(QLOCK(q));
1249 					cv_broadcast(&q->qfullcv);
1250 					mutex_exit(QLOCK(q));
1251 				}
1252 
1253 			}
1254 			if ((IS_QSTATE(q, RDC_QNOBLOCK)) &&
1255 			    _rdc_diskq_isfull(q, len) &&
1256 			    !IS_STATE(urdc, RDC_DISKQ_FAILED)) {
1257 				if (print_msg) {
1258 					cmn_err(CE_WARN, "!disk queue %s full",
1259 					    &urdc->disk_queue[0]);
1260 					print_msg = 0;
1261 				}
1262 				rdc_fail_diskq(krdc, RDC_WAIT,
1263 				    RDC_DOLOG | RDC_NOFAIL);
1264 				mutex_enter(QLOCK(q));
1265 				cv_broadcast(&q->qfullcv);
1266 				mutex_exit(QLOCK(q));
1267 			}
1268 		}
1269 
1270 thischeck:
1271 		if (krdc->many_next == this)
1272 			break;
1273 	}
1274 }
1275 
1276 int rdc_coalesce = 1;
1277 static int rdc_joins = 0;
1278 
1279 int
1280 rdc_aio_coalesce(rdc_aio_t *queued, rdc_aio_t *new)
1281 {
1282 	nsc_buf_t *h = NULL;
1283 	int rc;
1284 	rdc_k_info_t *krdc;
1285 	uint_t bitmask;
1286 
1287 	if (rdc_coalesce == 0)
1288 		return (0);		/* don't even try */
1289 
1290 	if ((queued == NULL) ||
1291 	    (queued->handle == NULL) ||
1292 	    (new->handle == NULL)) {
1293 		return (0);		/* existing queue is empty */
1294 	}
1295 	if (queued->index != new->index || queued->len + new->len >
1296 	    MAX_RDC_FBAS) {
1297 		return (0);		/* I/O to big */
1298 	}
1299 	if ((queued->pos + queued->len == new->pos) ||
1300 	    (new->pos + new->len == queued->pos)) {
1301 		rc = nsc_alloc_abuf(queued->pos, queued->len + new->len, 0,
1302 		    &h);
1303 		if (!RDC_SUCCESS(rc)) {
1304 			if (h != NULL)
1305 				(void) nsc_free_buf(h);
1306 			return (0);		/* couldn't do coalesce */
1307 		}
1308 		rc = nsc_copy(queued->handle, h, queued->pos, queued->pos,
1309 		    queued->len);
1310 		if (!RDC_SUCCESS(rc)) {
1311 			(void) nsc_free_buf(h);
1312 			return (0);		/* couldn't do coalesce */
1313 		}
1314 		rc = nsc_copy(new->handle, h, new->pos, new->pos,
1315 		    new->len);
1316 		if (!RDC_SUCCESS(rc)) {
1317 			(void) nsc_free_buf(h);
1318 			return (0);		/* couldn't do coalesce */
1319 		}
1320 
1321 		krdc = &rdc_k_info[queued->index];
1322 
1323 		RDC_SET_BITMASK(queued->pos, queued->len, &bitmask);
1324 		RDC_CLR_BITMAP(krdc, queued->pos, queued->len, \
1325 		    bitmask, RDC_BIT_BUMP);
1326 
1327 		RDC_SET_BITMASK(new->pos, new->len, &bitmask);
1328 		RDC_CLR_BITMAP(krdc, new->pos, new->len, \
1329 		    bitmask, RDC_BIT_BUMP);
1330 
1331 		(void) nsc_free_buf(queued->handle);
1332 		(void) nsc_free_buf(new->handle);
1333 		queued->handle = h;
1334 		queued->len += new->len;
1335 		bitmask = 0;
1336 		/*
1337 		 * bump the ref count back up
1338 		 */
1339 
1340 		RDC_SET_BITMAP(krdc, queued->pos, queued->len, &bitmask);
1341 		return (1);	/* new I/O succeeds last I/O queued */
1342 	}
1343 	return (0);
1344 }
1345 
1346 int
1347 rdc_memq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1348 {
1349 	net_queue *q;
1350 	rdc_group_t *group;
1351 
1352 	group = krdc->group;
1353 	q = &group->ra_queue;
1354 
1355 	mutex_enter(&q->net_qlock);
1356 
1357 	if (rdc_aio_coalesce(q->net_qtail, aio)) {
1358 		rdc_joins++;
1359 		q->blocks += aio->len;
1360 		kmem_free(aio, sizeof (*aio));
1361 		goto out;
1362 	}
1363 	aio->seq = group->seq++;
1364 	if (group->seq < aio->seq)
1365 		group->seq = RDC_NEWSEQ + 1; /* skip magics */
1366 
1367 	if (q->net_qhead == NULL) {
1368 		/* adding to empty q */
1369 		q->net_qhead = q->net_qtail = aio;
1370 
1371 #ifdef DEBUG
1372 		if (q->blocks != 0 || q->nitems != 0) {
1373 			cmn_err(CE_PANIC,
1374 			    "rdc enqueue: q %p, qhead 0, q blocks %" NSC_SZFMT
1375 			    ", nitems %" NSC_SZFMT,
1376 			    (void *) q, q->blocks, q->nitems);
1377 		}
1378 #endif
1379 
1380 	} else {
1381 		/* discontiguous, add aio to q tail */
1382 		q->net_qtail->next = aio;
1383 		q->net_qtail = aio;
1384 	}
1385 
1386 	q->blocks += aio->len;
1387 	q->nitems++;
1388 
1389 	if (krdc->io_kstats) {
1390 		mutex_enter(krdc->io_kstats->ks_lock);
1391 		kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1392 		mutex_exit(krdc->io_kstats->ks_lock);
1393 	}
1394 out:
1395 #ifdef DEBUG
1396 	/* sum the q and check for sanity */
1397 	{
1398 		nsc_size_t qblocks = 0;
1399 		uint64_t nitems = 0;
1400 		rdc_aio_t *a;
1401 
1402 		for (a = q->net_qhead; a != NULL; a = a->next) {
1403 			qblocks += a->len;
1404 			nitems++;
1405 		}
1406 
1407 		if (qblocks != q->blocks || nitems != q->nitems) {
1408 			cmn_err(CE_PANIC,
1409 			    "rdc enqueue: q %p, q blocks %" NSC_SZFMT " (%"
1410 			    NSC_SZFMT "), nitems %" NSC_SZFMT " (%"
1411 			    NSC_SZFMT ")", (void *) q, q->blocks, qblocks,
1412 			    q->nitems, nitems);
1413 		}
1414 	}
1415 #endif
1416 
1417 	mutex_exit(&q->net_qlock);
1418 
1419 	if (q->nitems > q->nitems_hwm) {
1420 		q->nitems_hwm = q->nitems;
1421 	}
1422 
1423 	if (q->blocks > q->blocks_hwm) {
1424 		q->blocks_hwm = q->blocks;
1425 	}
1426 
1427 	if (!krdc->group->rdc_writer)
1428 		(void) rdc_writer(krdc->index);
1429 
1430 	return (0);
1431 }
1432 
1433 int
1434 _rdc_enqueue_write(rdc_k_info_t *krdc, nsc_off_t pos, nsc_size_t len, int flag,
1435     nsc_buf_t *h)
1436 {
1437 	rdc_aio_t *aio;
1438 	rdc_group_t *group;
1439 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1440 	int rc;
1441 
1442 	aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1443 	if (!aio) {
1444 		return (ENOMEM);
1445 	}
1446 
1447 	group = krdc->group;
1448 
1449 	aio->pos = pos;
1450 	aio->qpos = -1;
1451 	aio->len = len;
1452 	aio->flag = flag;
1453 	aio->index = krdc->index;
1454 	aio->handle = h;
1455 
1456 	if (group->flags & RDC_MEMQUE) {
1457 		return (rdc_memq_enqueue(krdc, aio));
1458 	} else if ((group->flags & RDC_DISKQUE) &&
1459 	    !IS_STATE(urdc, RDC_DISKQ_FAILED)) {
1460 		rc = rdc_diskq_enqueue(krdc, aio);
1461 		kmem_free(aio, sizeof (*aio));
1462 		return (rc);
1463 	}
1464 	return (-1); /* keep lint quiet */
1465 }
1466 
1467 
1468 
1469 
1470 /*
1471  * Async Network RDC flusher
1472  */
1473 
1474 /*
1475  * don't allow any new writer threads to start if a member of the set
1476  * is disable pending
1477  */
1478 int
1479 is_disable_pending(rdc_k_info_t *krdc)
1480 {
1481 	rdc_k_info_t *this = krdc;
1482 	int rc = 0;
1483 
1484 	do {
1485 		if (krdc->type_flag & RDC_DISABLEPEND) {
1486 			krdc = this;
1487 			rc = 1;
1488 			break;
1489 		}
1490 		krdc = krdc->group_next;
1491 
1492 	} while (krdc != this);
1493 
1494 	return (rc);
1495 }
1496 
1497 /*
1498  * rdc_writer -- spawn new writer if not running already
1499  *	called after enqueing the dirty blocks
1500  */
1501 int
1502 rdc_writer(int index)
1503 {
1504 	rdc_k_info_t *krdc = &rdc_k_info[index];
1505 	nsthread_t *t;
1506 	rdc_group_t	*group;
1507 	kmutex_t	*qlock;
1508 	int tries;
1509 	const int MAX_TRIES = 16;
1510 
1511 	group = krdc->group;
1512 
1513 	if (RDC_IS_DISKQ(group))
1514 		qlock = &group->diskq.disk_qlock;
1515 	else
1516 		qlock = &group->ra_queue.net_qlock;
1517 
1518 	mutex_enter(qlock);
1519 
1520 #ifdef DEBUG
1521 	if (noflush) {
1522 		mutex_exit(qlock);
1523 		return (0);
1524 	}
1525 #endif
1526 
1527 	if ((group->rdc_writer) || is_disable_pending(krdc)) {
1528 		mutex_exit(qlock);
1529 		return (0);
1530 	}
1531 
1532 	if ((group->rdc_thrnum >= 1) && (group->seqack == RDC_NEWSEQ)) {
1533 		/*
1534 		 * We also need to check if we are starting a new
1535 		 * sequence, and if so don't create a new thread,
1536 		 * as we must ensure that the start of new sequence
1537 		 * requests arrives first to re-init the server.
1538 		 */
1539 		mutex_exit(qlock);
1540 		return (0);
1541 	}
1542 	/*
1543 	 * For version 6,
1544 	 * see if we can fit in another thread.
1545 	 */
1546 	group->rdc_thrnum++;
1547 
1548 	if (krdc->intf && (krdc->intf->rpc_version >= RDC_VERSION6)) {
1549 		rdc_u_info_t *urdc = &rdc_u_info[index];
1550 		if (group->rdc_thrnum >= urdc->asyncthr)
1551 			group->rdc_writer = 1;
1552 	} else {
1553 		group->rdc_writer = 1;
1554 	}
1555 
1556 	mutex_exit(qlock);
1557 
1558 
1559 	/*
1560 	 * If we got here, we know that we have not exceeded the allowed
1561 	 * number of async threads for our group.  If we run out of threads
1562 	 * in _rdc_flset, we add a new thread to the set.
1563 	 */
1564 	tries = 0;
1565 	do {
1566 		/* first try to grab a thread from the free list */
1567 		if (t = nst_create(_rdc_flset, rdc_flusher_thread,
1568 		    (blind_t)(unsigned long)index, 0)) {
1569 			break;
1570 		}
1571 
1572 		/* that failed; add a thread to the set and try again */
1573 		if (nst_add_thread(_rdc_flset, 1) != 1) {
1574 			cmn_err(CE_WARN, "!rdc_writer index %d nst_add_thread "
1575 			    "error, tries: %d", index, tries);
1576 			break;
1577 		}
1578 	} while (++tries < MAX_TRIES);
1579 
1580 	if (tries) {
1581 		mutex_enter(&group->addthrnumlk);
1582 		group->rdc_addthrnum += tries;
1583 		mutex_exit(&group->addthrnumlk);
1584 	}
1585 
1586 	if (t) {
1587 		return (1);
1588 	}
1589 
1590 	cmn_err(CE_WARN, "!rdc_writer: index %d nst_create error", index);
1591 	rdc_many_enter(krdc);
1592 	mutex_enter(qlock);
1593 	group->rdc_thrnum--;
1594 	group->rdc_writer = 0;
1595 	if ((group->count == 0) && (group->rdc_thrnum == 0)) {
1596 		mutex_exit(qlock);
1597 		/*
1598 		 * Race with remove_from_group while write thread was
1599 		 * failing to be created.
1600 		 */
1601 #ifdef DEBUG
1602 		cmn_err(CE_WARN, "!rdc_writer: group being destroyed");
1603 #endif
1604 		rdc_delgroup(group);
1605 		krdc->group = NULL;
1606 		rdc_many_exit(krdc);
1607 		return (-1);
1608 	}
1609 	mutex_exit(qlock);
1610 	rdc_many_exit(krdc);
1611 	return (-1);
1612 }
1613 
1614 /*
1615  * Either we need to flush the
1616  * kmem (net_queue) queue or the disk (disk_queue)
1617  * determine which, and do it.
1618  */
1619 void
1620 rdc_flusher_thread(int index)
1621 {
1622 	rdc_k_info_t *krdc = &rdc_k_info[index];
1623 
1624 	if (krdc->group->flags & RDC_MEMQUE) {
1625 		rdc_flush_memq(index);
1626 		return;
1627 	} else if (krdc->group->flags & RDC_DISKQUE) {
1628 		rdc_flush_diskq(index);
1629 		return;
1630 	} else { /* uh-oh, big time */
1631 		cmn_err(CE_PANIC, "flusher trying to flush unknown queue type");
1632 	}
1633 
1634 }
1635 
1636 void
1637 rdc_flush_memq(int index)
1638 {
1639 	rdc_k_info_t *krdc = &rdc_k_info[index];
1640 	rdc_aio_t *aio;
1641 	net_queue *q;
1642 	int dowork;
1643 	rdc_group_t *group = krdc->group;
1644 	if (!group || group->count == 0) {
1645 #ifdef DEBUG
1646 		cmn_err(CE_WARN, "!rdc_flush_memq: no group left!");
1647 #endif
1648 		return;
1649 	}
1650 
1651 	if (!krdc->c_fd) {
1652 #ifdef DEBUG
1653 		cmn_err(CE_WARN, "!rdc_flush_memq: no c_fd!");
1654 #endif
1655 		goto thread_death;
1656 	}
1657 
1658 #ifdef DEBUG_DISABLE
1659 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1660 		cmn_err(CE_WARN, "!rdc_flush_memq: DISABLE PENDING!");
1661 		/*
1662 		 * Need to continue as we may be trying to flush IO
1663 		 * while trying to disable or suspend
1664 		 */
1665 	}
1666 #endif
1667 
1668 	q = &group->ra_queue;
1669 
1670 	dowork = 1;
1671 	/* CONSTCOND */
1672 	while (dowork) {
1673 		if (net_exit == ATM_EXIT)
1674 			break;
1675 
1676 		group = krdc->group;
1677 		if (!group || group->count == 0) {
1678 #ifdef DEBUG
1679 			cmn_err(CE_WARN, "!rdc_flush_memq: no group left!");
1680 #endif
1681 			break;
1682 		}
1683 
1684 		mutex_enter(&q->net_qlock);
1685 		aio = q->net_qhead;
1686 
1687 		if (aio == NULL) {
1688 #ifdef DEBUG
1689 			if (q->nitems != 0 ||
1690 			    q->blocks != 0 ||
1691 			    q->net_qtail != 0) {
1692 				cmn_err(CE_PANIC,
1693 				    "rdc_flush_memq(1): q %p, q blocks %"
1694 				    NSC_SZFMT ", nitems %" NSC_SZFMT
1695 				    ", qhead %p qtail %p",
1696 				    (void *) q, q->blocks, q->nitems,
1697 				    (void *) aio, (void *) q->net_qtail);
1698 			}
1699 #endif
1700 			mutex_exit(&q->net_qlock);
1701 			break;
1702 		}
1703 
1704 		/* aio remove from q */
1705 
1706 		q->net_qhead = aio->next;
1707 		aio->next = NULL;
1708 
1709 		if (q->net_qtail == aio)
1710 			q->net_qtail = q->net_qhead;
1711 
1712 		q->blocks -= aio->len;
1713 		q->nitems--;
1714 
1715 		/*
1716 		 * in flight numbers.
1717 		 */
1718 		q->inflbls += aio->len;
1719 		q->inflitems++;
1720 
1721 #ifdef DEBUG
1722 		if (q->net_qhead == NULL) {
1723 			if (q->nitems != 0 ||
1724 			    q->blocks != 0 ||
1725 			    q->net_qtail != 0) {
1726 				cmn_err(CE_PANIC,
1727 				    "rdc_flush_memq(2): q %p, q blocks %"
1728 				    NSC_SZFMT ", nitems %" NSC_SZFMT
1729 				    ", qhead %p qtail %p",
1730 				    (void *) q, q->blocks, q->nitems,
1731 				    (void *) q->net_qhead,
1732 				    (void *) q->net_qtail);
1733 			}
1734 		}
1735 
1736 #ifndef NSC_MULTI_TERABYTE
1737 		if (q->blocks < 0) {
1738 			cmn_err(CE_PANIC,
1739 			    "rdc_flush_memq(3): q %p, q blocks %" NSC_SZFMT
1740 			    ", nitems %d, qhead %p, qtail %p",
1741 			    (void *) q, q->blocks, q->nitems,
1742 			    (void *) q->net_qhead, (void *) q->net_qtail);
1743 		}
1744 #else
1745 		/* blocks and nitems are unsigned for NSC_MULTI_TERABYTE */
1746 #endif
1747 #endif
1748 
1749 		mutex_exit(&q->net_qlock);
1750 
1751 		aio->iostatus = RDC_IO_INIT;
1752 
1753 		_rdc_remote_flush(aio);
1754 
1755 		mutex_enter(&q->net_qlock);
1756 		q->inflbls -= aio->len;
1757 		q->inflitems--;
1758 		if ((group->seqack == RDC_NEWSEQ) &&
1759 		    (group->seq != RDC_NEWSEQ + 1)) {
1760 			if ((q->net_qhead == NULL) ||
1761 			    (q->net_qhead->seq != RDC_NEWSEQ + 1)) {
1762 				/*
1763 				 * We are an old thread, and the
1764 				 * queue sequence has been reset
1765 				 * during the network write above.
1766 				 * As such we mustn't pull another
1767 				 * job from the queue until the
1768 				 * first sequence message has been ack'ed.
1769 				 * Just die instead. Unless this thread
1770 				 * is the first sequence that has just
1771 				 * been ack'ed
1772 				 */
1773 				dowork = 0;
1774 			}
1775 		}
1776 		mutex_exit(&q->net_qlock);
1777 
1778 		if ((aio->iostatus != RDC_IO_DONE) && (group->count)) {
1779 			rdc_k_info_t *krdctmp = &rdc_k_info[aio->index];
1780 			if (krdctmp->type_flag & RDC_DISABLEPEND) {
1781 				kmem_free(aio, sizeof (*aio));
1782 				goto thread_death;
1783 			}
1784 			rdc_group_enter(krdc);
1785 			ASSERT(krdc->group);
1786 			rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
1787 			    "memq flush aio status not RDC_IO_DONE");
1788 			rdc_group_exit(krdc);
1789 			rdc_dump_queue(aio->index);
1790 		}
1791 		kmem_free(aio, sizeof (*aio));
1792 
1793 		if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf)
1794 			break;
1795 	}
1796 
1797 thread_death:
1798 	rdc_many_enter(krdc);
1799 	mutex_enter(&group->ra_queue.net_qlock);
1800 	group->rdc_thrnum--;
1801 	group->rdc_writer = 0;
1802 	/*
1803 	 * all threads must be dead.
1804 	 */
1805 	if ((group->count == 0) && (group->rdc_thrnum == 0)) {
1806 		mutex_exit(&group->ra_queue.net_qlock);
1807 		/*
1808 		 * Group now empty, so destroy
1809 		 * Race with remove_from_group while write thread was running
1810 		 */
1811 #ifdef DEBUG
1812 		cmn_err(CE_WARN, "!rdc_flush_memq: group being destroyed");
1813 #endif
1814 		rdc_delgroup(group);
1815 		krdc->group = NULL;
1816 		rdc_many_exit(krdc);
1817 		return;
1818 	}
1819 	mutex_exit(&group->ra_queue.net_qlock);
1820 	rdc_many_exit(krdc);
1821 }
1822 
1823 /*
1824  * rdc_flush_diskq
1825  * disk queue flusher
1826  */
1827 void
1828 rdc_flush_diskq(int index)
1829 {
1830 	rdc_k_info_t *krdc = &rdc_k_info[index];
1831 	rdc_u_info_t *urdc = &rdc_u_info[index];
1832 	rdc_aio_t *aio = NULL;
1833 	disk_queue *q;
1834 	net_queue *nq;
1835 	int dowork;
1836 	int rc;
1837 	rdc_group_t *group = krdc->group;
1838 
1839 	if (!group || group->count == 0) {
1840 #ifdef DEBUG
1841 		cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!");
1842 #endif
1843 		return;
1844 	}
1845 
1846 	if (!krdc->c_fd) {
1847 #ifdef DEBUG
1848 		cmn_err(CE_WARN, "!rdc_flush_diskq: no c_fd!");
1849 #endif
1850 		return;
1851 	}
1852 
1853 #ifdef DEBUG_DISABLE
1854 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1855 		cmn_err(CE_WARN, "!rdc_flush_diskq: DISABLE PENDING!");
1856 		/*
1857 		 * Need to continue as we may be trying to flush IO
1858 		 * while trying to disable or suspend
1859 		 */
1860 	}
1861 #endif
1862 	q = &group->diskq;
1863 	nq = &group->ra_queue;
1864 
1865 	if (IS_QSTATE(q, RDC_QDISABLEPEND) || IS_STATE(urdc, RDC_LOGGING)) {
1866 #ifdef DEBUG
1867 		cmn_err(CE_NOTE, "!flusher thread death 1 %x", QSTATE(q));
1868 #endif
1869 		goto thread_death;
1870 	}
1871 
1872 	dowork = 1;
1873 	/* CONSTCOND */
1874 	while (dowork) {
1875 		if (net_exit == ATM_EXIT)
1876 			break;
1877 
1878 		group = krdc->group;
1879 		if (!group || group->count == 0) {
1880 #ifdef DEBUG
1881 			cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!");
1882 #endif
1883 			break;
1884 		}
1885 
1886 		do {
1887 			rc = 0;
1888 			if ((IS_STATE(urdc, RDC_LOGGING)) ||
1889 			    (IS_STATE(urdc, RDC_SYNCING)) ||
1890 			    (nq->qfflags & RDC_QFILLSLEEP))
1891 				goto thread_death;
1892 
1893 			aio = rdc_dequeue(krdc, &rc);
1894 
1895 			if ((IS_STATE(urdc, RDC_LOGGING)) ||
1896 			    (IS_STATE(urdc, RDC_SYNCING)) ||
1897 			    (nq->qfflags & RDC_QFILLSLEEP)) {
1898 				goto thread_death;
1899 			}
1900 			if (rc == EAGAIN) {
1901 				delay(40);
1902 			}
1903 
1904 		} while (rc == EAGAIN);
1905 
1906 		if (aio == NULL) {
1907 			break;
1908 		}
1909 
1910 		aio->iostatus = RDC_IO_INIT;
1911 
1912 		mutex_enter(QLOCK(q));
1913 		q->inflbls += aio->len;
1914 		q->inflitems++;
1915 		mutex_exit(QLOCK(q));
1916 
1917 		_rdc_remote_flush(aio);
1918 
1919 		mutex_enter(QLOCK(q));
1920 		q->inflbls -= aio->len;
1921 		q->inflitems--;
1922 
1923 		if ((group->seqack == RDC_NEWSEQ) &&
1924 		    (group->seq != RDC_NEWSEQ + 1)) {
1925 			if ((nq->net_qhead == NULL) ||
1926 			    (nq->net_qhead->seq != RDC_NEWSEQ + 1)) {
1927 				/*
1928 				 * We are an old thread, and the
1929 				 * queue sequence has been reset
1930 				 * during the network write above.
1931 				 * As such we mustn't pull another
1932 				 * job from the queue until the
1933 				 * first sequence message has been ack'ed.
1934 				 * Just die instead. Unless of course,
1935 				 * this thread is the first sequence that
1936 				 * has just been ack'ed.
1937 				 */
1938 				dowork = 0;
1939 			}
1940 		}
1941 		mutex_exit(QLOCK(q));
1942 
1943 		if (aio->iostatus == RDC_IO_CANCELLED) {
1944 			rdc_dump_queue(aio->index);
1945 			kmem_free(aio, sizeof (*aio));
1946 			aio = NULL;
1947 			if (group) { /* seq gets bumped on dequeue */
1948 				mutex_enter(QLOCK(q));
1949 				rdc_dump_iohdrs(q);
1950 				SET_QNXTIO(q, QHEAD(q));
1951 				SET_QCOALBOUNDS(q, QHEAD(q));
1952 				group->seq = RDC_NEWSEQ;
1953 				group->seqack = RDC_NEWSEQ;
1954 				mutex_exit(QLOCK(q));
1955 			}
1956 			break;
1957 		}
1958 
1959 		if ((aio->iostatus != RDC_IO_DONE) && (group->count)) {
1960 			rdc_k_info_t *krdctmp = &rdc_k_info[aio->index];
1961 			if (krdctmp->type_flag & RDC_DISABLEPEND) {
1962 				kmem_free(aio, sizeof (*aio));
1963 				aio = NULL;
1964 				goto thread_death;
1965 			}
1966 			rdc_group_enter(krdc);
1967 			rdc_group_log(krdc,
1968 			    RDC_NOFLUSH | RDC_ALLREMOTE | RDC_QUEUING,
1969 			    "diskq flush aio status not RDC_IO_DONE");
1970 			rdc_group_exit(krdc);
1971 			rdc_dump_queue(aio->index);
1972 		}
1973 
1974 		kmem_free(aio, sizeof (*aio));
1975 		aio = NULL;
1976 
1977 #ifdef DEBUG_DISABLE
1978 		if (krdc->type_flag & RDC_DISABLEPEND) {
1979 			cmn_err(CE_WARN,
1980 			    "!rdc_flush_diskq: DISABLE PENDING after IO!");
1981 		}
1982 #endif
1983 		if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf)
1984 			break;
1985 
1986 		if (IS_QSTATE(q, RDC_QDISABLEPEND)) {
1987 #ifdef DEBUG
1988 			cmn_err(CE_NOTE, "!flusher thread death 2");
1989 #endif
1990 			break;
1991 		}
1992 	}
1993 thread_death:
1994 	rdc_many_enter(krdc);
1995 	mutex_enter(QLOCK(q));
1996 	group->rdc_thrnum--;
1997 	group->rdc_writer = 0;
1998 
1999 	if (aio && aio->qhandle) {
2000 		aio->qhandle->sb_user--;
2001 		if (aio->qhandle->sb_user == 0) {
2002 			(void) _rdc_rsrv_diskq(krdc->group);
2003 			rdc_fixlen(aio);
2004 			(void) nsc_free_buf(aio->qhandle);
2005 			aio->qhandle = NULL;
2006 			aio->handle = NULL;
2007 			_rdc_rlse_diskq(krdc->group);
2008 		}
2009 	}
2010 	if ((group->count == 0) && (group->rdc_thrnum == 0)) {
2011 		mutex_exit(QLOCK(q));
2012 		/*
2013 		 * Group now empty, so destroy
2014 		 * Race with remove_from_group while write thread was running
2015 		 */
2016 #ifdef DEBUG
2017 		cmn_err(CE_WARN, "!rdc_flush_diskq: group being destroyed");
2018 #endif
2019 		mutex_enter(&group->diskqmutex);
2020 		rdc_close_diskq(group);
2021 		mutex_exit(&group->diskqmutex);
2022 		rdc_delgroup(group);
2023 		krdc->group = NULL;
2024 		rdc_many_exit(krdc);
2025 		return;
2026 	}
2027 	mutex_exit(QLOCK(q));
2028 	rdc_many_exit(krdc);
2029 }
2030 
2031 /*
2032  * _rdc_remote_flush
2033  * Flush a single block ANON block
2034  * this function will flush from either the disk queue
2035  * or the memory queue. The appropriate locks must be
2036  * taken out etc, etc ...
2037  */
2038 static void
2039 _rdc_remote_flush(rdc_aio_t *aio)
2040 {
2041 	rdc_k_info_t *krdc = &rdc_k_info[aio->index];
2042 	rdc_u_info_t *urdc = &rdc_u_info[aio->index];
2043 	disk_queue *q = &krdc->group->diskq;
2044 	kmutex_t *qlock;
2045 	rdc_group_t *group;
2046 	nsc_buf_t *h = NULL;
2047 	int reserved = 0;
2048 	int rtype = RDC_RAW;
2049 	int rc;
2050 	uint_t maxseq;
2051 	struct netwriteres netret;
2052 	int waitq = 1;
2053 	int vflags;
2054 
2055 	group = krdc->group;
2056 	netret.vecdata.vecdata_val = NULL;
2057 	netret.vecdata.vecdata_len = 0;
2058 
2059 	/* Where did we get this aio from anyway? */
2060 	if (RDC_IS_DISKQ(group)) {
2061 		qlock = &group->diskq.disk_qlock;
2062 	} else {
2063 		qlock = &group->ra_queue.net_qlock;
2064 	}
2065 
2066 	/*
2067 	 * quench transmission if we are too far ahead of the
2068 	 * server Q, or it will overflow.
2069 	 * Must fail all requests while asyncdis is set.
2070 	 * It will be cleared when the last thread to be discarded
2071 	 * sets the asyncstall counter to zero.
2072 	 * Note the thread within rdc_net_write
2073 	 * also bumps the asyncstall counter.
2074 	 */
2075 
2076 	mutex_enter(qlock);
2077 	if (group->asyncdis) {
2078 		aio->iostatus = RDC_IO_CANCELLED;
2079 		mutex_exit(qlock);
2080 		goto failed;
2081 	}
2082 	/* don't go to sleep if we have gone logging! */
2083 	vflags = rdc_get_vflags(urdc);
2084 	if ((vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2085 		if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group))
2086 			aio->iostatus = RDC_IO_CANCELLED;
2087 
2088 		mutex_exit(qlock);
2089 		goto failed;
2090 	}
2091 
2092 	while (maxseq = group->seqack + RDC_MAXPENDQ + 1,
2093 	    maxseq = (maxseq < group->seqack) ? maxseq + RDC_NEWSEQ + 1
2094 	    : maxseq, !RDC_INFRONT(aio->seq, maxseq)) {
2095 		group->asyncstall++;
2096 		ASSERT(!IS_STATE(urdc, RDC_LOGGING));
2097 		cv_wait(&group->asyncqcv, qlock);
2098 		group->asyncstall--;
2099 		ASSERT(group->asyncstall >= 0);
2100 		if (group->asyncdis) {
2101 			if (group->asyncstall == 0) {
2102 				group->asyncdis = 0;
2103 			}
2104 			aio->iostatus = RDC_IO_CANCELLED;
2105 			mutex_exit(qlock);
2106 			goto failed;
2107 		}
2108 		/*
2109 		 * See if we have gone into logging mode
2110 		 * since sleeping.
2111 		 */
2112 		vflags = rdc_get_vflags(urdc);
2113 		if (vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING)) {
2114 			if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group))
2115 				aio->iostatus = RDC_IO_CANCELLED;
2116 
2117 			mutex_exit(qlock);
2118 			goto failed;
2119 		}
2120 	}
2121 	mutex_exit(qlock);
2122 
2123 	if ((krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) {
2124 		mutex_enter(krdc->io_kstats->ks_lock);
2125 		kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2126 		mutex_exit(krdc->io_kstats->ks_lock);
2127 		waitq = 0;
2128 	}
2129 
2130 
2131 	rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
2132 	if (rc != 0) {
2133 #ifdef DEBUG
2134 		cmn_err(CE_WARN, "!_rdc_remote_flush: reserve, index %d, rc %d",
2135 		    aio->index, rc);
2136 #endif
2137 		goto failed;
2138 	}
2139 
2140 	reserved = 1;
2141 	/*
2142 	 * Case where we are multihop and calling with no ANON bufs
2143 	 * Need to do the read to fill the buf.
2144 	 */
2145 	if (!aio->handle) {
2146 		rc = nsc_alloc_buf(RDC_U_FD(krdc), aio->pos, aio->len,
2147 		    (aio->flag & ~NSC_WRITE) | NSC_READ, &h);
2148 		if (!RDC_SUCCESS(rc)) {
2149 #ifdef DEBUG
2150 			cmn_err(CE_WARN,
2151 			    "!_rdc_remote_flush: alloc_buf, index %d, pos %"
2152 			    NSC_SZFMT ", len %" NSC_SZFMT ", rc %d",
2153 			    aio->index, aio->pos, aio->len, rc);
2154 #endif
2155 
2156 			goto failed;
2157 		}
2158 		aio->handle = h;
2159 		aio->handle->sb_user = RDC_NULLBUFREAD;
2160 	}
2161 
2162 	mutex_enter(qlock);
2163 	if (group->asyncdis) {
2164 		if (group->asyncstall == 0) {
2165 			group->asyncdis = 0;
2166 		}
2167 		aio->iostatus = RDC_IO_CANCELLED;
2168 		mutex_exit(qlock);
2169 		goto failed;
2170 	}
2171 	group->asyncstall++;
2172 	mutex_exit(qlock);
2173 
2174 
2175 	if (krdc->remote_index < 0) {
2176 		/*
2177 		 * this should be ok, we are flushing, not rev syncing.
2178 		 * remote_index could be -1 if we lost a race with
2179 		 * resume and the flusher trys to flush an io from
2180 		 * another set that has not resumed
2181 		 */
2182 		krdc->remote_index = rdc_net_state(krdc->index, CCIO_SLAVE);
2183 		DTRACE_PROBE1(remote_index_negative, int, krdc->remote_index);
2184 
2185 	}
2186 
2187 	/*
2188 	 * double check for logging, no check in net_write()
2189 	 * skip the write if you can, otherwise, if logging
2190 	 * avoid clearing the bit .. you don't know whose bit it may
2191 	 * also be.
2192 	 */
2193 	if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2194 		aio->iostatus = RDC_IO_CANCELLED;
2195 		mutex_enter(qlock);
2196 		group->asyncstall--;
2197 		mutex_exit(qlock);
2198 		goto failed;
2199 	}
2200 
2201 	rc = rdc_net_write(krdc->index, krdc->remote_index,
2202 	    aio->handle, aio->pos, aio->len, aio->seq, aio->qpos, &netret);
2203 
2204 	mutex_enter(qlock);
2205 	group->asyncstall--;
2206 	if (group->asyncdis) {
2207 		if (group->asyncstall == 0) {
2208 			group->asyncdis = 0;
2209 		}
2210 		aio->iostatus = RDC_IO_CANCELLED;
2211 		mutex_exit(qlock);
2212 		goto failed;
2213 	}
2214 
2215 	if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2216 		mutex_exit(qlock);
2217 		aio->iostatus = RDC_IO_CANCELLED;
2218 		goto failed;
2219 	}
2220 
2221 	ASSERT(aio->handle);
2222 	if (rc != 0) {
2223 #ifdef DEBUG
2224 		cmn_err(CE_WARN,
2225 		    "!_rdc_remote_flush: write, index %d, pos %" NSC_SZFMT
2226 		    ", len %" NSC_SZFMT ", "
2227 		    "rc %d seq %u group seq %u seqack %u qpos %" NSC_SZFMT,
2228 		    aio->index, aio->pos, aio->len, rc, aio->seq,
2229 		    group->seq, group->seqack, aio->qpos);
2230 #endif
2231 		if (rc == ENOLINK) {
2232 			cmn_err(CE_WARN,
2233 			    "!Hard timeout detected (%d sec) "
2234 			    "on SNDR set %s:%s",
2235 			    rdc_rpc_tmout, urdc->secondary.intf,
2236 			    urdc->secondary.file);
2237 		}
2238 		mutex_exit(qlock);
2239 		goto failed;
2240 	} else {
2241 		aio->iostatus = RDC_IO_DONE;
2242 	}
2243 
2244 	if (RDC_IS_DISKQ(group)) {
2245 		/* free locally alloc'd handle */
2246 		if (aio->handle->sb_user == RDC_NULLBUFREAD) {
2247 			(void) nsc_free_buf(aio->handle);
2248 			aio->handle = NULL;
2249 		}
2250 		aio->qhandle->sb_user--;
2251 		if (aio->qhandle->sb_user == 0) {
2252 			(void) _rdc_rsrv_diskq(group);
2253 			rdc_fixlen(aio);
2254 			(void) nsc_free_buf(aio->qhandle);
2255 			aio->qhandle = NULL;
2256 			aio->handle = NULL;
2257 			_rdc_rlse_diskq(group);
2258 		}
2259 
2260 	} else {
2261 		(void) nsc_free_buf(aio->handle);
2262 		aio->handle = NULL;
2263 	}
2264 
2265 	mutex_exit(qlock);
2266 
2267 	_rdc_rlse_devs(krdc, rtype);
2268 
2269 	if (netret.result == 0) {
2270 		vflags = rdc_get_vflags(urdc);
2271 
2272 		if (!(vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2273 			RDC_CLR_BITMAP(krdc, aio->pos, aio->len, \
2274 			    0xffffffff, RDC_BIT_BUMP);
2275 
2276 			if (RDC_IS_DISKQ(krdc->group)) {
2277 				if (!IS_STATE(urdc, RDC_LOGGING)) {
2278 					/* tell queue data has been flushed */
2279 					rdc_clr_iohdr(krdc, aio->qpos);
2280 				} else { /* throw away queue, logging */
2281 					mutex_enter(qlock);
2282 					rdc_dump_iohdrs(q);
2283 					SET_QNXTIO(q, QHEAD(q));
2284 					SET_QCOALBOUNDS(q, QHEAD(q));
2285 					mutex_exit(qlock);
2286 				}
2287 			}
2288 		}
2289 
2290 		mutex_enter(qlock);
2291 		/*
2292 		 * Check to see if the reply has arrived out of
2293 		 * order, if so don't update seqack.
2294 		 */
2295 		if (!RDC_INFRONT(aio->seq, group->seqack)) {
2296 			group->seqack = aio->seq;
2297 		}
2298 #ifdef DEBUG
2299 		else {
2300 			rdc_ooreply++;
2301 		}
2302 #endif
2303 		if (group->asyncstall) {
2304 			cv_broadcast(&group->asyncqcv);
2305 		}
2306 		mutex_exit(qlock);
2307 	} else if (netret.result < 0) {
2308 		aio->iostatus = RDC_IO_FAILED;
2309 	}
2310 
2311 	/*
2312 	 * see if we have any pending async requests we can mark
2313 	 * as done.
2314 	 */
2315 
2316 	if (netret.vecdata.vecdata_len) {
2317 		net_pendvec_t *vecp;
2318 		net_pendvec_t *vecpe;
2319 		vecp = netret.vecdata.vecdata_val;
2320 		vecpe = netret.vecdata.vecdata_val + netret.vecdata.vecdata_len;
2321 		while (vecp < vecpe) {
2322 			rdc_k_info_t *krdcp = &rdc_k_info[vecp->pindex];
2323 			rdc_u_info_t *urdcp = &rdc_u_info[vecp->pindex];
2324 			/*
2325 			 * we must always still be in the same group.
2326 			 */
2327 			ASSERT(krdcp->group == group);
2328 			vflags = rdc_get_vflags(urdcp);
2329 
2330 			if (!(vflags &
2331 			    (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2332 				RDC_CLR_BITMAP(krdcp, vecp->apos, vecp->alen, \
2333 				    0xffffffff, RDC_BIT_BUMP);
2334 				if (RDC_IS_DISKQ(krdcp->group)) {
2335 					if (!IS_STATE(urdc, RDC_LOGGING)) {
2336 						/* update queue info */
2337 						rdc_clr_iohdr(krdc, vecp->qpos);
2338 					} else { /* we've gone logging */
2339 						mutex_enter(qlock);
2340 						rdc_dump_iohdrs(q);
2341 						SET_QNXTIO(q, QHEAD(q));
2342 						SET_QCOALBOUNDS(q, QHEAD(q));
2343 						mutex_exit(qlock);
2344 					}
2345 				}
2346 			}
2347 
2348 			/*
2349 			 * see if we can re-start transmission
2350 			 */
2351 			mutex_enter(qlock);
2352 			if (!RDC_INFRONT(vecp->seq, group->seqack)) {
2353 				group->seqack = vecp->seq;
2354 			}
2355 #ifdef DEBUG
2356 			else {
2357 				rdc_ooreply++;
2358 			}
2359 #endif
2360 			DTRACE_PROBE1(pendvec_return, int, vecp->seq);
2361 
2362 			if (group->asyncstall) {
2363 				cv_broadcast(&group->asyncqcv);
2364 			}
2365 			mutex_exit(qlock);
2366 			vecp++;
2367 		}
2368 	}
2369 	if (netret.vecdata.vecdata_val)
2370 		kmem_free(netret.vecdata.vecdata_val,
2371 		    netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
2372 	return;
2373 failed:
2374 
2375 	/* perhaps we have a few threads stuck .. */
2376 	if (group->asyncstall) {
2377 		group->asyncdis = 1;
2378 		cv_broadcast(&group->asyncqcv);
2379 	}
2380 	if (netret.vecdata.vecdata_val)
2381 		kmem_free(netret.vecdata.vecdata_val,
2382 		    netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
2383 
2384 	mutex_enter(qlock);
2385 	if (RDC_IS_DISKQ(group)) {
2386 		/* free locally alloc'd hanlde */
2387 		if ((aio->handle) &&
2388 		    (aio->handle->sb_user == RDC_NULLBUFREAD)) {
2389 			(void) nsc_free_buf(aio->handle);
2390 			aio->handle = NULL;
2391 		}
2392 		aio->qhandle->sb_user--;
2393 		if (aio->qhandle->sb_user == 0) {
2394 			(void) _rdc_rsrv_diskq(group);
2395 			rdc_fixlen(aio);
2396 			(void) nsc_free_buf(aio->qhandle);
2397 			aio->qhandle = NULL;
2398 			aio->handle = NULL;
2399 			_rdc_rlse_diskq(group);
2400 		}
2401 	} else {
2402 		if (aio->handle) {
2403 			(void) nsc_free_buf(aio->handle);
2404 			aio->handle = NULL;
2405 		}
2406 	}
2407 	mutex_exit(qlock);
2408 
2409 	if (reserved) {
2410 		_rdc_rlse_devs(krdc, rtype);
2411 	}
2412 
2413 	if ((waitq && krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) {
2414 		mutex_enter(krdc->io_kstats->ks_lock);
2415 		kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2416 		mutex_exit(krdc->io_kstats->ks_lock);
2417 	}
2418 
2419 	/* make sure that the bit is still set */
2420 	RDC_CHECK_BIT(krdc, aio->pos, aio->len);
2421 
2422 	if (aio->iostatus != RDC_IO_CANCELLED)
2423 		aio->iostatus = RDC_IO_FAILED;
2424 }
2425 
2426 
2427 /*
2428  * rdc_drain_disk_queue
2429  * drain the async network queue for the whole group. Bail out if nothing
2430  * happens in 20 sec
2431  * returns -1 if it bails before the queues are drained.
2432  */
2433 #define	NUM_RETRIES	15	/* Number of retries to wait if no progress */
2434 int
2435 rdc_drain_disk_queue(int index)
2436 {
2437 	rdc_k_info_t *krdc = &rdc_k_info[index];
2438 	volatile rdc_group_t *group;
2439 	volatile disk_queue *diskq;
2440 	int threads, counter;
2441 	long blocks;
2442 
2443 	/* Sanity checking */
2444 	if (index > rdc_max_sets)
2445 		return (0);
2446 
2447 	/*
2448 	 * If there is no group or diskq configured, we can leave now
2449 	 */
2450 	if (!(group = krdc->group) || !(diskq = &group->diskq))
2451 		return (0);
2452 
2453 	/*
2454 	 * No need to wait if EMPTY and threads are gone
2455 	 */
2456 	counter = 0;
2457 	while (!QEMPTY(diskq) || group->rdc_thrnum) {
2458 
2459 		/*
2460 		 * Capture counters to determine if progress is being made
2461 		 */
2462 		blocks = QBLOCKS(diskq);
2463 		threads = group->rdc_thrnum;
2464 
2465 		/*
2466 		 * Wait
2467 		 */
2468 		delay(HZ);
2469 
2470 		/*
2471 		 * Has the group or disk queue gone away while delayed?
2472 		 */
2473 		if (!(group = krdc->group) || !(diskq = &group->diskq))
2474 			return (0);
2475 
2476 		/*
2477 		 * Are we still seeing progress?
2478 		 */
2479 		if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
2480 			/*
2481 			 * No progress seen, increment retry counter
2482 			 */
2483 			if (counter++ > NUM_RETRIES) {
2484 				return (-1);
2485 			}
2486 		} else {
2487 			/*
2488 			 * Reset counter, as we've made progress
2489 			 */
2490 			counter = 0;
2491 		}
2492 	}
2493 
2494 	return (0);
2495 }
2496 
2497 /*
2498  * decide what needs to be drained, disk or core
2499  * and drain it
2500  */
2501 int
2502 rdc_drain_queue(int index)
2503 {
2504 	rdc_k_info_t *krdc = &rdc_k_info[index];
2505 	rdc_group_t *group = krdc->group;
2506 
2507 	if (!group)
2508 		return (0);
2509 
2510 	if (RDC_IS_DISKQ(group))
2511 		return (rdc_drain_disk_queue(index));
2512 	if (RDC_IS_MEMQ(group))
2513 		return (rdc_drain_net_queue(index));
2514 	/* oops.. */
2515 #ifdef DEBUG
2516 	cmn_err(CE_WARN, "!rdc_drain_queue: "
2517 	    "attempting drain of unknown Q type");
2518 #endif
2519 	return (0);
2520 }
2521 
2522 /*
2523  * rdc_drain_net_queue
2524  * drain the async network queue for the whole group. Bail out if nothing
2525  * happens in 20 sec
2526  * returns -1 if it bails before the queues are drained.
2527  */
2528 int
2529 rdc_drain_net_queue(int index)
2530 {
2531 	rdc_k_info_t *krdc = &rdc_k_info[index];
2532 	volatile net_queue *q;
2533 	int bail = 20;	/* bail out in about 20 secs */
2534 	nsc_size_t blocks;
2535 
2536 	/* Sanity checking */
2537 	if (index > rdc_max_sets)
2538 		return (0);
2539 	if (!krdc->group)
2540 		return (0);
2541 	/* LINTED */
2542 	if (!(q = &krdc->group->ra_queue))
2543 		return (0);
2544 
2545 	/* CONSTCOND */
2546 	while (1) {
2547 
2548 		if (((volatile rdc_aio_t *)q->net_qhead == NULL) &&
2549 		    (krdc->group->rdc_thrnum == 0)) {
2550 			break;
2551 		}
2552 
2553 		blocks = q->blocks;
2554 
2555 		q = (volatile net_queue *)&krdc->group->ra_queue;
2556 
2557 		if ((blocks == q->blocks) &&
2558 		    (--bail <= 0)) {
2559 			break;
2560 		}
2561 
2562 		delay(HZ);
2563 	}
2564 
2565 	if (bail <= 0)
2566 		return (-1);
2567 
2568 	return (0);
2569 }
2570 
2571 /*
2572  * rdc_dump_queue
2573  * We want to release all the blocks currently on the network flushing queue
2574  * We already have them logged in the bitmap.
2575  */
2576 void
2577 rdc_dump_queue(int index)
2578 {
2579 	rdc_k_info_t *krdc = &rdc_k_info[index];
2580 	rdc_aio_t *aio;
2581 	net_queue *q;
2582 	rdc_group_t *group;
2583 	disk_queue *dq;
2584 	kmutex_t *qlock;
2585 
2586 	group = krdc->group;
2587 
2588 	q = &group->ra_queue;
2589 	dq = &group->diskq;
2590 
2591 	/*
2592 	 * gotta have both locks here for diskq
2593 	 */
2594 
2595 	if (RDC_IS_DISKQ(group)) {
2596 		mutex_enter(&q->net_qlock);
2597 		if (q->qfill_sleeping == RDC_QFILL_AWAKE) {
2598 			int tries = 3;
2599 #ifdef DEBUG_DISKQ
2600 			cmn_err(CE_NOTE,
2601 			    "!dumpq sending diskq->memq flusher to sleep");
2602 #endif
2603 			q->qfflags |= RDC_QFILLSLEEP;
2604 			mutex_exit(&q->net_qlock);
2605 			while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--)
2606 				delay(5);
2607 			mutex_enter(&q->net_qlock);
2608 		}
2609 	}
2610 
2611 	if (RDC_IS_DISKQ(group)) {
2612 		qlock = &dq->disk_qlock;
2613 		(void) _rdc_rsrv_diskq(group);
2614 	} else {
2615 		qlock = &q->net_qlock;
2616 	}
2617 
2618 	mutex_enter(qlock);
2619 
2620 	group->seq = RDC_NEWSEQ;	/* reset the sequence number */
2621 	group->seqack = RDC_NEWSEQ;
2622 
2623 	/* if the q is on disk, dump the q->iohdr chain */
2624 	if (RDC_IS_DISKQ(group)) {
2625 		rdc_dump_iohdrs(dq);
2626 
2627 		/* back up the nxtio pointer */
2628 		SET_QNXTIO(dq, QHEAD(dq));
2629 		SET_QCOALBOUNDS(dq, QHEAD(dq));
2630 	}
2631 
2632 	while (q->net_qhead) {
2633 		rdc_k_info_t *tmpkrdc;
2634 		aio = q->net_qhead;
2635 		tmpkrdc = &rdc_k_info[aio->index];
2636 
2637 		if (RDC_IS_DISKQ(group)) {
2638 			aio->qhandle->sb_user--;
2639 			if (aio->qhandle->sb_user == 0) {
2640 				rdc_fixlen(aio);
2641 				(void) nsc_free_buf(aio->qhandle);
2642 				aio->qhandle = NULL;
2643 				aio->handle = NULL;
2644 			}
2645 		} else {
2646 			if (aio->handle) {
2647 				(void) nsc_free_buf(aio->handle);
2648 				aio->handle = NULL;
2649 			}
2650 		}
2651 
2652 		q->net_qhead = aio->next;
2653 		RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len);
2654 
2655 		kmem_free(aio, sizeof (*aio));
2656 		if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(group)) {
2657 			mutex_enter(tmpkrdc->io_kstats->ks_lock);
2658 			kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats));
2659 			mutex_exit(tmpkrdc->io_kstats->ks_lock);
2660 		}
2661 
2662 	}
2663 
2664 	q->net_qtail = NULL;
2665 	q->blocks = 0;
2666 	q->nitems = 0;
2667 
2668 	/*
2669 	 * See if we have stalled threads.
2670 	 */
2671 done:
2672 	if (group->asyncstall) {
2673 		group->asyncdis = 1;
2674 		cv_broadcast(&group->asyncqcv);
2675 	}
2676 	mutex_exit(qlock);
2677 	if (RDC_IS_DISKQ(group)) {
2678 		mutex_exit(&q->net_qlock);
2679 		_rdc_rlse_diskq(group);
2680 	}
2681 
2682 }
2683 
2684 
2685 /*
2686  * rdc_clnt_get
2687  * Get a CLIENT handle and cache it
2688  */
2689 
2690 static int
2691 rdc_clnt_get(rdc_srv_t *svp, rpcvers_t vers, struct chtab **rch, CLIENT **clp)
2692 {
2693 	uint_t	max_msgsize;
2694 	int	retries;
2695 	int ret;
2696 	struct cred		*cred;
2697 	int num_clnts = 0;
2698 	register struct chtab *ch;
2699 	struct chtab **plistp;
2700 	CLIENT *client = 0;
2701 
2702 	if (rch) {
2703 		*rch = 0;
2704 	}
2705 
2706 	if (clp) {
2707 		*clp = 0;
2708 	}
2709 
2710 	retries = 6;	/* Never used for COTS in Solaris */
2711 	cred = ddi_get_cred();
2712 	max_msgsize = RDC_RPC_MAX;
2713 
2714 	mutex_enter(&rdc_clnt_lock);
2715 
2716 	ch = rdc_chtable;
2717 	plistp = &rdc_chtable;
2718 
2719 	/* find the right ch_list chain */
2720 
2721 	for (ch = rdc_chtable; ch != NULL; ch = ch->ch_next) {
2722 		if (ch->ch_prog == RDC_PROGRAM &&
2723 		    ch->ch_vers == vers &&
2724 		    ch->ch_dev == svp->ri_knconf->knc_rdev &&
2725 		    ch->ch_protofmly != NULL &&
2726 		    strcmp(ch->ch_protofmly,
2727 		    svp->ri_knconf->knc_protofmly) == 0) {
2728 			/* found the correct chain to walk */
2729 			break;
2730 		}
2731 		plistp = &ch->ch_next;
2732 	}
2733 
2734 	if (ch != NULL) {
2735 		/* walk the ch_list and try and find a free client */
2736 
2737 		for (num_clnts = 0; ch != NULL; ch = ch->ch_list, num_clnts++) {
2738 			if (ch->ch_inuse == FALSE) {
2739 				/* suitable handle to reuse */
2740 				break;
2741 			}
2742 			plistp = &ch->ch_list;
2743 		}
2744 	}
2745 
2746 	if (ch == NULL && num_clnts >= MAXCLIENTS) {
2747 		/* alloc a temporary handle and return */
2748 
2749 		rdc_clnt_toomany++;
2750 		mutex_exit(&rdc_clnt_lock);
2751 
2752 		ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2753 		    RDC_PROGRAM, vers, max_msgsize, retries, cred, &client);
2754 
2755 		if (ret != 0) {
2756 			cmn_err(CE_NOTE,
2757 			    "!rdc_call: tli_kcreate failed %d", ret);
2758 			return (ret);
2759 		}
2760 
2761 		*rch = 0;
2762 		*clp = client;
2763 		(void) CLNT_CONTROL(client, CLSET_PROGRESS, NULL);
2764 		return (ret);
2765 	}
2766 
2767 	if (ch != NULL) {
2768 		/* reuse a cached handle */
2769 
2770 		ch->ch_inuse = TRUE;
2771 		ch->ch_timesused++;
2772 		mutex_exit(&rdc_clnt_lock);
2773 
2774 		*rch = ch;
2775 
2776 		if (ch->ch_client == NULL) {
2777 			ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2778 			    RDC_PROGRAM, vers, max_msgsize, retries,
2779 			    cred, &ch->ch_client);
2780 			if (ret != 0) {
2781 				ch->ch_inuse = FALSE;
2782 				return (ret);
2783 			}
2784 
2785 			(void) CLNT_CONTROL(ch->ch_client, CLSET_PROGRESS,
2786 			    NULL);
2787 			*clp = ch->ch_client;
2788 
2789 			return (0);
2790 		} else {
2791 		/*
2792 		 * Consecutive calls to CLNT_CALL() on the same client handle
2793 		 * get the same transaction ID.  We want a new xid per call,
2794 		 * so we first reinitialise the handle.
2795 		 */
2796 			(void) clnt_tli_kinit(ch->ch_client, svp->ri_knconf,
2797 			    &(svp->ri_addr), max_msgsize, retries, cred);
2798 
2799 			*clp = ch->ch_client;
2800 			return (0);
2801 		}
2802 	}
2803 
2804 	/* create new handle and cache it */
2805 	ch = (struct chtab *)kmem_zalloc(sizeof (*ch), KM_SLEEP);
2806 
2807 	if (ch) {
2808 		ch->ch_inuse = TRUE;
2809 		ch->ch_prog = RDC_PROGRAM;
2810 		ch->ch_vers = vers;
2811 		ch->ch_dev = svp->ri_knconf->knc_rdev;
2812 		ch->ch_protofmly = (char *)kmem_zalloc(
2813 		    strlen(svp->ri_knconf->knc_protofmly)+1, KM_SLEEP);
2814 		if (ch->ch_protofmly)
2815 			(void) strcpy(ch->ch_protofmly,
2816 			    svp->ri_knconf->knc_protofmly);
2817 		*plistp = ch;
2818 	}
2819 
2820 	mutex_exit(&rdc_clnt_lock);
2821 
2822 	ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2823 	    RDC_PROGRAM, vers, max_msgsize, retries, cred, clp);
2824 
2825 	if (ret != 0) {
2826 		if (ch)
2827 			ch->ch_inuse = FALSE;
2828 		cmn_err(CE_NOTE, "!rdc_call: tli_kcreate failed %d", ret);
2829 		return (ret);
2830 	}
2831 
2832 	*rch = ch;
2833 	if (ch)
2834 		ch->ch_client = *clp;
2835 
2836 	(void) CLNT_CONTROL(*clp, CLSET_PROGRESS, NULL);
2837 
2838 	return (ret);
2839 }
2840 
2841 
2842 long rdc_clnt_count = 0;
2843 
2844 /*
2845  * rdc_clnt_call
2846  * Arguments:
2847  *	rdc_srv_t *svp - rdc servinfo
2848  *	rpcproc_t proc; - rpcid
2849  *	rpcvers_t vers; - protocol version
2850  *	xdrproc_t xargs;- xdr function
2851  *	caddr_t argsp;- args to xdr function
2852  *	xdrproc_t xres;- xdr function
2853  *	caddr_t resp;- args to xdr function
2854  *	struct timeval timeout;
2855  * Performs RPC client call using specific protocol and version
2856  */
2857 
2858 int
2859 rdc_clnt_call(rdc_srv_t *svp, rpcproc_t proc, rpcvers_t vers,
2860 		xdrproc_t xargs, caddr_t argsp,
2861 		xdrproc_t xres, caddr_t resp, struct timeval *timeout)
2862 {
2863 	CLIENT *rh = NULL;
2864 	int err;
2865 	int tries = 0;
2866 	struct chtab *ch = NULL;
2867 
2868 	err = rdc_clnt_get(svp, vers, &ch, &rh);
2869 	if (err || !rh)
2870 		return (err);
2871 
2872 	do {
2873 		DTRACE_PROBE3(rdc_clnt_call_1,
2874 		    CLIENT *, rh, rpcproc_t, proc, xdrproc_t, xargs);
2875 
2876 		err = cl_call_sig(rh, proc, xargs, argsp, xres, resp, *timeout);
2877 
2878 		DTRACE_PROBE1(rdc_clnt_call_end, int, err);
2879 
2880 		switch (err) {
2881 			case RPC_SUCCESS: /* bail now */
2882 				goto done;
2883 			case RPC_INTR:	/* No recovery from this */
2884 				goto done;
2885 			case RPC_PROGVERSMISMATCH:
2886 				goto done;
2887 			case RPC_TLIERROR:
2888 				/* fall thru */
2889 			case RPC_XPRTFAILED:
2890 				/* Delay here to err on side of caution */
2891 				/* fall thru */
2892 			case RPC_VERSMISMATCH:
2893 
2894 			default:
2895 				if (IS_UNRECOVERABLE_RPC(err)) {
2896 					goto done;
2897 				}
2898 				tries++;
2899 			/*
2900 			 * The call is in progress (over COTS)
2901 			 * Try the CLNT_CALL again, but don't
2902 			 * print a noisy error message
2903 			 */
2904 				if (err == RPC_INPROGRESS)
2905 					break;
2906 				cmn_err(CE_NOTE, "!SNDR client: err %d %s",
2907 				    err, clnt_sperrno(err));
2908 			}
2909 	} while (tries && (tries < 2));
2910 done:
2911 	++rdc_clnt_count;
2912 	rdc_clnt_free(ch, rh);
2913 	return (err);
2914 }
2915 
2916 
2917 /*
2918  * Call an rpc from the client side, not caring which protocol is used.
2919  */
2920 int
2921 rdc_clnt_call_any(rdc_srv_t *svp, rdc_if_t *ip, rpcproc_t proc,
2922 		xdrproc_t xargs, caddr_t argsp,
2923 		xdrproc_t xres, caddr_t resp, struct timeval *timeout)
2924 {
2925 	rpcvers_t vers;
2926 	int rc;
2927 
2928 	if (ip != NULL) {
2929 		vers = ip->rpc_version;
2930 	} else {
2931 		vers = RDC_VERS_MAX;
2932 	}
2933 
2934 	do {
2935 		rc = rdc_clnt_call(svp, proc, vers, xargs, argsp,
2936 		    xres, resp, timeout);
2937 
2938 		if (rc == RPC_PROGVERSMISMATCH) {
2939 			/*
2940 			 * Downgrade and try again.
2941 			 */
2942 			vers--;
2943 		}
2944 	} while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH));
2945 
2946 	if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) {
2947 		mutex_enter(&rdc_ping_lock);
2948 		ip->rpc_version = vers;
2949 		mutex_exit(&rdc_ping_lock);
2950 	}
2951 
2952 	return (rc);
2953 }
2954 
2955 /*
2956  * Call an rpc from the client side, starting with protocol specified
2957  */
2958 int
2959 rdc_clnt_call_walk(rdc_k_info_t *krdc, rpcproc_t proc, xdrproc_t xargs,
2960 		caddr_t argsp, xdrproc_t xres, caddr_t resp,
2961 		struct timeval *timeout)
2962 {
2963 	int rc;
2964 	rpcvers_t vers;
2965 	rdc_srv_t *svp = krdc->lsrv;
2966 	rdc_if_t *ip = krdc->intf;
2967 	vers = krdc->rpc_version;
2968 
2969 	do {
2970 		rc = rdc_clnt_call(svp, proc, vers, xargs, argsp,
2971 		    xres, resp, timeout);
2972 
2973 		if (rc == RPC_PROGVERSMISMATCH) {
2974 			/*
2975 			 * Downgrade and try again.
2976 			 */
2977 			vers--;
2978 		}
2979 	} while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH));
2980 
2981 	if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) {
2982 		mutex_enter(&rdc_ping_lock);
2983 		ip->rpc_version = vers;
2984 		mutex_exit(&rdc_ping_lock);
2985 	}
2986 
2987 	return (rc);
2988 }
2989 
2990 /*
2991  * rdc_clnt_free
2992  * Free a client structure into the cache, or if this was a temporary
2993  * handle allocated above MAXCLIENTS, destroy it.
2994  */
2995 static void
2996 rdc_clnt_free(struct chtab *ch, CLIENT *clp)
2997 {
2998 	if (ch != NULL) {
2999 		/* cached client, just clear inuse flag and return */
3000 		ASSERT(ch->ch_client == clp);
3001 		ch->ch_inuse = FALSE;
3002 		return;
3003 	}
3004 
3005 	/* temporary handle allocated above MAXCLIENTS, so destroy it */
3006 
3007 	if (clp->cl_auth) {
3008 		AUTH_DESTROY(clp->cl_auth);
3009 		clp->cl_auth = 0;
3010 	}
3011 
3012 	CLNT_DESTROY(clp);
3013 }
3014 
3015 
3016 /*
3017  * _rdc_clnt_destroy
3018  * Free a chain (ch_list or ch_next) of cached clients
3019  */
3020 static int
3021 _rdc_clnt_destroy(struct chtab **p, const int list)
3022 {
3023 	struct chtab *ch;
3024 	int leak = 0;
3025 
3026 	if (!p)
3027 		return (0);
3028 
3029 	while (*p != NULL) {
3030 		ch = *p;
3031 
3032 		/*
3033 		 * unlink from the chain
3034 		 * - this leaks the client if it was inuse
3035 		 */
3036 
3037 		*p = list ? ch->ch_list : ch->ch_next;
3038 
3039 		if (!ch->ch_inuse) {
3040 			/* unused client - destroy it */
3041 
3042 			if (ch->ch_client) {
3043 				if (ch->ch_client->cl_auth) {
3044 					AUTH_DESTROY(ch->ch_client->cl_auth);
3045 					ch->ch_client->cl_auth = 0;
3046 				}
3047 
3048 				CLNT_DESTROY(ch->ch_client);
3049 				ch->ch_client = 0;
3050 			}
3051 
3052 			if (ch->ch_protofmly)
3053 				kmem_free(ch->ch_protofmly,
3054 				    strlen(ch->ch_protofmly)+1);
3055 
3056 			kmem_free(ch, sizeof (*ch));
3057 		} else {
3058 			/* remember client leak */
3059 			leak++;
3060 		}
3061 	}
3062 
3063 	return (leak);
3064 }
3065 
3066 
3067 /*
3068  * rdc_clnt_destroy
3069  * Free client caching table on unconfigure
3070  */
3071 void
3072 rdc_clnt_destroy(void)
3073 {
3074 	struct chtab *ch;
3075 	int leak = 0;
3076 
3077 	mutex_enter(&rdc_clnt_lock);
3078 
3079 	/* destroy each ch_list chain */
3080 
3081 	for (ch = rdc_chtable; ch; ch = ch->ch_next) {
3082 		leak += _rdc_clnt_destroy(&ch->ch_list, 1);
3083 	}
3084 
3085 	/* destroy the main ch_next chain */
3086 	leak += _rdc_clnt_destroy(&rdc_chtable, 0);
3087 
3088 	if (leak) {
3089 		/* we are about to leak clients */
3090 		cmn_err(CE_WARN,
3091 		    "!rdc_clnt_destroy: leaking %d inuse clients", leak);
3092 	}
3093 
3094 	mutex_exit(&rdc_clnt_lock);
3095 }
3096 
3097 #ifdef	DEBUG
3098 /*
3099  * Function to send an asynchronous net_data6 request
3100  * direct to a server to allow the generation of
3101  * out of order requests for ZatoIchi tests.
3102  */
3103 int
3104 rdc_async6(void *arg, int mode, int *rvp)
3105 {
3106 	int			index;
3107 	rdc_async6_t		async6;
3108 	struct net_data6	data6;
3109 	rdc_k_info_t		*krdc;
3110 	rdc_u_info_t		*urdc;
3111 	char			*data;
3112 	int			datasz;
3113 	char			*datap;
3114 	int			rc;
3115 	struct timeval		t;
3116 	struct netwriteres	netret;
3117 	int i;
3118 
3119 	rc = 0;
3120 	*rvp = 0;
3121 	/*
3122 	 * copyin the user's arguments.
3123 	 */
3124 	if (ddi_copyin(arg, &async6, sizeof (async6), mode) < 0) {
3125 		return (EFAULT);
3126 	}
3127 
3128 	/*
3129 	 * search by the secondary host and file.
3130 	 */
3131 	mutex_enter(&rdc_conf_lock);
3132 	for (index = 0; index < rdc_max_sets; index++) {
3133 		urdc = &rdc_u_info[index];
3134 		krdc = &rdc_k_info[index];
3135 
3136 		if (!IS_CONFIGURED(krdc))
3137 			continue;
3138 		if (!IS_ENABLED(urdc))
3139 			continue;
3140 		if (!IS_ASYNC(urdc))
3141 			continue;
3142 		if (krdc->rpc_version < RDC_VERSION6)
3143 			continue;
3144 
3145 		if ((strncmp(urdc->secondary.intf, async6.sechost,
3146 		    MAX_RDC_HOST_SIZE) == 0) &&
3147 		    (strncmp(urdc->secondary.file, async6.secfile,
3148 		    NSC_MAXPATH) == 0)) {
3149 			break;
3150 		}
3151 	}
3152 	mutex_exit(&rdc_conf_lock);
3153 	if (index >= rdc_max_sets) {
3154 		return (ENOENT);
3155 	}
3156 
3157 	if (async6.spos != -1) {
3158 		if ((async6.spos < async6.pos) ||
3159 		    ((async6.spos + async6.slen) >
3160 		    (async6.pos + async6.len))) {
3161 			cmn_err(CE_WARN, "!Sub task not within range "
3162 			    "start %d length %d sub start %d sub length %d",
3163 			    async6.pos, async6.len, async6.spos, async6.slen);
3164 			return (EIO);
3165 		}
3166 	}
3167 
3168 	datasz = FBA_SIZE(1);
3169 	data = kmem_alloc(datasz, KM_SLEEP);
3170 	datap = data;
3171 	while (datap < &data[datasz]) {
3172 		/* LINTED */
3173 		*datap++ = async6.pat;
3174 	}
3175 
3176 	/*
3177 	 * Fill in the net databuffer prior to transmission.
3178 	 */
3179 
3180 	data6.local_cd = krdc->index;
3181 	if (krdc->remote_index == -1) {
3182 		cmn_err(CE_WARN, "!Remote index not known");
3183 		kmem_free(data, datasz);
3184 		return (EIO);
3185 	} else {
3186 		data6.cd = krdc->remote_index;
3187 	}
3188 	data6.pos = async6.pos;
3189 	data6.len = async6.len;
3190 	data6.flag = 0;
3191 	data6.idx = async6.idx;
3192 	data6.seq = async6.seq;
3193 
3194 	if (async6.spos == -1) {
3195 		data6.sfba = async6.pos;
3196 		data6.nfba = async6.len;
3197 		data6.endoblk = 1;
3198 
3199 	} else {
3200 		data6.sfba = async6.spos;
3201 		data6.nfba = async6.slen;
3202 		data6.endoblk = async6.endind;
3203 	}
3204 
3205 	data6.data.data_len = datasz;
3206 	data6.data.data_val = data;
3207 
3208 	t.tv_sec = rdc_rpc_tmout;
3209 	t.tv_usec = 0;
3210 
3211 	netret.vecdata.vecdata_val = NULL;
3212 	netret.vecdata.vecdata_len = 0;
3213 
3214 
3215 	rc = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, krdc->rpc_version,
3216 	    xdr_net_data6, (char *)&data6, xdr_netwriteres, (char *)&netret,
3217 	    &t);
3218 
3219 	kmem_free(data, datasz);
3220 	if (rc == 0) {
3221 		if (netret.result < 0) {
3222 			rc = -netret.result;
3223 		}
3224 		cmn_err(CE_NOTE, "!async6: seq %u result %d index %d "
3225 		    "pendcnt %d",
3226 		    netret.seq, netret.result, netret.index,
3227 		    netret.vecdata.vecdata_len);
3228 		for (i = 0; i < netret.vecdata.vecdata_len; i++) {
3229 			net_pendvec_t pvec;
3230 			bcopy(netret.vecdata.vecdata_val + i, &pvec,
3231 			    sizeof (net_pendvec_t));
3232 			cmn_err(CE_NOTE, "!Seq %u pos %llu len %llu",
3233 			    pvec.seq, (unsigned long long)pvec.apos,
3234 			    (unsigned long long)pvec.alen);
3235 		}
3236 		if (netret.vecdata.vecdata_val)
3237 			kmem_free(netret.vecdata.vecdata_val,
3238 			    netret.vecdata.vecdata_len *
3239 			    sizeof (net_pendvec_t));
3240 	} else {
3241 		cmn_err(CE_NOTE, "!async6: rpc call failed %d", rc);
3242 	}
3243 	*rvp = netret.index;
3244 	return (rc);
3245 }
3246 
3247 /*
3248  * Function to send an net_read6 request
3249  * direct to a server to allow the generation of
3250  * read requests.
3251  */
3252 int
3253 rdc_readgen(void *arg, int mode, int *rvp)
3254 {
3255 	int			index;
3256 	rdc_readgen_t		readgen;
3257 	rdc_readgen32_t		readgen32;
3258 	struct rread6		read6;
3259 	struct rread		read5;
3260 	rdc_k_info_t		*krdc;
3261 	int			ret;
3262 	struct timeval		t;
3263 	struct rdcrdresult	rr;
3264 	int			err;
3265 
3266 	*rvp = 0;
3267 	rr.rr_bufsize = 0;	/* rpc data buffer length (bytes) */
3268 	rr.rr_data = NULL;	/* rpc data buffer */
3269 	if (ddi_model_convert_from(mode & FMODELS) == DDI_MODEL_ILP32) {
3270 		if (ddi_copyin(arg, &readgen32, sizeof (readgen32), mode)) {
3271 			return (EFAULT);
3272 		}
3273 		(void) strncpy(readgen.sechost, readgen32.sechost,
3274 		    MAX_RDC_HOST_SIZE);
3275 		(void) strncpy(readgen.secfile, readgen32.secfile, NSC_MAXPATH);
3276 		readgen.len = readgen32.len;
3277 		readgen.pos = readgen32.pos;
3278 		readgen.idx = readgen32.idx;
3279 		readgen.flag = readgen32.flag;
3280 		readgen.data = (void *)(unsigned long)readgen32.data;
3281 		readgen.rpcversion = readgen32.rpcversion;
3282 	} else {
3283 		if (ddi_copyin(arg, &readgen, sizeof (readgen), mode)) {
3284 			return (EFAULT);
3285 		}
3286 	}
3287 	switch (readgen.rpcversion) {
3288 	case 5:
3289 	case 6:
3290 		break;
3291 	default:
3292 		return (EINVAL);
3293 	}
3294 
3295 	mutex_enter(&rdc_conf_lock);
3296 	index = rdc_lookup_byhostdev(readgen.sechost, readgen.secfile);
3297 	if (index >= 0) {
3298 		krdc = &rdc_k_info[index];
3299 	}
3300 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
3301 		mutex_exit(&rdc_conf_lock);
3302 		return (ENODEV);
3303 	}
3304 	/*
3305 	 * we should really call setbusy here.
3306 	 */
3307 	mutex_exit(&rdc_conf_lock);
3308 
3309 	t.tv_sec = rdc_rpc_tmout;
3310 	t.tv_usec = 0;
3311 	if (krdc->remote_index == -1) {
3312 		cmn_err(CE_WARN, "!Remote index not known");
3313 		ret = EIO;
3314 		goto out;
3315 	}
3316 	if (readgen.rpcversion == 6) {
3317 		read6.cd = krdc->remote_index;
3318 		read6.len = readgen.len;
3319 		read6.pos = readgen.pos;
3320 		read6.idx = readgen.idx;
3321 		read6.flag = readgen.flag;
3322 	} else {
3323 		read5.cd = krdc->remote_index;
3324 		read5.len = readgen.len;
3325 		read5.pos = readgen.pos;
3326 		read5.idx = readgen.idx;
3327 		read5.flag = readgen.flag;
3328 	}
3329 
3330 	if (readgen.flag & RDC_RREAD_START) {
3331 		if (readgen.rpcversion == 6) {
3332 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
3333 			    RDC_VERSION6, xdr_rread6, (char *)&read6,
3334 			    xdr_int, (char *)&ret, &t);
3335 		} else {
3336 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
3337 			    RDC_VERSION5, xdr_rread, (char *)&read5,
3338 			    xdr_int, (char *)&ret, &t);
3339 		}
3340 		if (err == 0) {
3341 			*rvp = ret;
3342 			ret = 0;
3343 		} else {
3344 			ret = EPROTO;
3345 		}
3346 	} else {
3347 		if (readgen.rpcversion == 6) {
3348 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
3349 			    RDC_VERSION6, xdr_rread6, (char *)&read6,
3350 			    xdr_rdresult, (char *)&rr, &t);
3351 		} else {
3352 			err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
3353 			    RDC_VERSION5, xdr_rread, (char *)&read5,
3354 			    xdr_rdresult, (char *)&rr, &t);
3355 		}
3356 		if (err == 0) {
3357 			if (rr.rr_status != RDC_OK) {
3358 				ret = EIO;
3359 				goto out;
3360 			}
3361 			*rvp = rr.rr_bufsize;
3362 			if (ddi_copyout(rr.rr_data, readgen.data,
3363 			    rr.rr_bufsize, mode) != 0) {
3364 				ret = EFAULT;
3365 				goto out;
3366 			}
3367 			ret = 0;
3368 		} else {
3369 			ret = EPROTO;
3370 			goto out;
3371 		}
3372 	}
3373 out:
3374 	if (rr.rr_data) {
3375 		kmem_free(rr.rr_data, rr.rr_bufsize);
3376 	}
3377 	return (ret);
3378 }
3379 
3380 
3381 #endif
3382