xref: /titanic_51/usr/src/uts/common/avs/ns/rdc/rdc_svc.c (revision 1fd2cb30ed9441dcd42f7250881c1f8d075723a9)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * kRPC Server for sndr
28  */
29 
30 #include <sys/types.h>
31 #include <sys/ksynch.h>
32 #include <sys/cmn_err.h>
33 #include <sys/kmem.h>
34 #include <sys/cred.h>
35 #include <sys/conf.h>
36 #include <sys/stream.h>
37 #include <sys/errno.h>
38 
39 #include <sys/unistat/spcs_s.h>
40 #include <sys/unistat/spcs_s_k.h>
41 #include <sys/unistat/spcs_errors.h>
42 
43 #ifdef _SunOS_2_6
44 /*
45  * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we
46  * define enum_t here as it is all we need from rpc/types.h
47  * anyway and make it look like we included it. Yuck.
48  */
49 #define	_RPC_TYPES_H
50 typedef int enum_t;
51 #else
52 #ifndef DS_DDICT
53 #include <rpc/types.h>
54 #endif
55 #endif /* _SunOS_2_6 */
56 
57 #ifndef DS_DDICT
58 #include <rpc/auth.h>
59 #include <rpc/svc.h>
60 #include <rpc/xdr.h>
61 #endif
62 #include <sys/ddi.h>
63 #include <sys/nsc_thread.h>
64 #ifdef DS_DDICT
65 #include <sys/nsctl/contract.h>
66 #endif
67 #include <sys/nsctl/nsctl.h>
68 #include <sys/ncall/ncall.h>
69 
70 #include <sys/sdt.h>		/* dtrace is S10 or later */
71 
72 #include "rdc_io.h"
73 #include "rdc_bitmap.h"
74 #include "rdcsrv.h"
75 
76 static rdc_sleepq_t *rdc_newsleepq();
77 static void rdc_delsleepq(rdc_sleepq_t *);
78 static int rdc_sleepq(rdc_group_t *, rdc_sleepq_t *);
79 static int rdc_combywrite(rdc_k_info_t *, nsc_buf_t *);
80 static int rdc_writemaxfba(rdc_k_info_t *, rdc_u_info_t *,
81     rdc_net_dataset_t *, uint_t, int);
82 static void rdc_setbitind(int *, net_pendvec_t *, rdc_net_dataset_t *, uint_t,
83     int, int);
84 static void rdc_dopending(rdc_group_t *, netwriteres *);
85 static nsc_vec_t *rdc_dset2vec(rdc_net_dataset_t *);
86 static int rdc_combyread(rdc_k_info_t *, rdc_u_info_t *, nsc_buf_t *);
87 static int rdc_readmaxfba(int, nsc_off_t, nsc_size_t, int);
88 static int rdc_dsetcopy(rdc_net_dataset_t *, nsc_vec_t *, nsc_off_t, nsc_size_t,
89     char *, int, int);
90 
91 /* direction for dsetcopy() */
92 #define	COPY_IN		1	/* copy data into the rpc buffer */
93 #define	COPY_OUT	2	/* copy data out of the rpc buffer */
94 
95 #define	MAX_EINTR_COUNT 1000
96 
97 static int rdc_rread_slow;
98 static rdcsrv_t rdc_srvtab[];
99 
100 #ifdef	DEBUG
101 static int rdc_netwrite6;
102 static int rdc_stall0;
103 static int rdc_sleepcnt;
104 int rdc_datasetcnt;
105 #endif
106 
107 
108 int
109 _rdc_sync_event_notify(int operation, char *volume, char *group)
110 {
111 	int ack = 0;
112 	clock_t time;
113 
114 	mutex_enter(&rdc_sync_mutex);
115 	mutex_enter(&rdc_sync_event.mutex);
116 
117 	if (rdc_sync_event.daemon_waiting) {
118 		rdc_sync_event.daemon_waiting = 0;
119 		rdc_sync_event.event = operation;
120 		(void) strncpy(rdc_sync_event.master, volume, NSC_MAXPATH);
121 		(void) strncpy(rdc_sync_event.group, group, NSC_MAXPATH);
122 
123 		cv_signal(&rdc_sync_event.cv);
124 
125 		rdc_sync_event.kernel_waiting = 1;
126 		time = cv_reltimedwait_sig(&rdc_sync_event.done_cv,
127 		    &rdc_sync_event.mutex, rdc_sync_event_timeout,
128 		    TR_CLOCK_TICK);
129 		if (time == (clock_t)0 || time == (clock_t)-1) {
130 			/* signalled or timed out */
131 			ack = 0;
132 		} else {
133 			if (rdc_sync_event.ack)
134 				ack = 1;
135 			else
136 				ack = -1;
137 		}
138 	}
139 	mutex_exit(&rdc_sync_event.mutex);
140 	mutex_exit(&rdc_sync_mutex);
141 	return (ack);
142 }
143 
144 
145 int
146 _rdc_sync_event_wait(void *arg0, void *arg1, int mode, spcs_s_info_t kstatus,
147     int *rvp)
148 {
149 	int rc = 0;
150 	static char master[NSC_MAXPATH];
151 
152 	master[0] = '\0';
153 	*rvp = 0;
154 	if (ddi_copyin(arg0, master, NSC_MAXPATH, mode))
155 		return (EFAULT);
156 
157 	mutex_enter(&rdc_sync_event.mutex);
158 
159 	if (rdc_sync_event.kernel_waiting &&
160 	    (rdc_sync_event.lbolt - nsc_lbolt() < rdc_sync_event_timeout)) {
161 		/* We haven't been away too long */
162 		if (master[0])
163 			rdc_sync_event.ack = 1;
164 		else
165 			rdc_sync_event.ack = 0;
166 		rdc_sync_event.kernel_waiting = 0;
167 		cv_signal(&rdc_sync_event.done_cv);
168 	}
169 
170 	rdc_sync_event.daemon_waiting = 1;
171 	if (cv_wait_sig(&rdc_sync_event.cv, &rdc_sync_event.mutex) == 0) {
172 		rdc_sync_event.daemon_waiting = 0;
173 		rc = EAGAIN;
174 		spcs_s_add(kstatus, rc);
175 	} else {
176 		(void) ddi_copyout(rdc_sync_event.master, arg0, NSC_MAXPATH,
177 		    mode);
178 		(void) ddi_copyout(rdc_sync_event.group, arg1, NSC_MAXPATH,
179 		    mode);
180 		*rvp = rdc_sync_event.event;
181 	}
182 	rdc_sync_event.lbolt = nsc_lbolt();
183 	mutex_exit(&rdc_sync_event.mutex);
184 
185 	return (rc);
186 }
187 
188 
189 static int
190 rdc_allow_sec_sync(rdc_u_info_t *urdc, int option)
191 {
192 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
193 	rdc_k_info_t *ktmp;
194 	rdc_u_info_t *utmp;
195 
196 	if (!IS_MULTI(krdc))
197 		return (0);
198 
199 	rdc_many_enter(krdc);
200 
201 	krdc = krdc->multi_next;
202 	urdc = &rdc_u_info[krdc->index];
203 
204 	if (!IS_ENABLED(urdc)) {
205 		rdc_many_exit(krdc);
206 		return (0);
207 	}
208 
209 	if (option == CCIO_RSYNC) {
210 
211 		/* Reverse sync */
212 
213 		if (rdc_get_mflags(urdc) & RDC_RSYNC_NEEDED) {
214 			/*
215 			 * Reverse sync needed or in progress.
216 			 */
217 			rdc_many_exit(krdc);
218 			return (-1);
219 		}
220 	} else {
221 		ASSERT(option == CCIO_SLAVE);
222 
223 		/* Forward sync */
224 
225 		if (rdc_get_mflags(urdc) & RDC_SLAVE) {
226 			/*
227 			 * Reverse syncing is bad, as that means that data
228 			 * is already flowing to the target of the requested
229 			 * sync operation.
230 			 */
231 			rdc_many_exit(krdc);
232 			return (-1);
233 		}
234 
235 		/*
236 		 * Clear "reverse sync needed" on all 1-many volumes.
237 		 * The data on them will be updated from the primary of this
238 		 * requested sync operation, so the aborted reverse sync need
239 		 * not be completed.
240 		 */
241 
242 		if ((rdc_get_mflags(urdc) & RDC_RSYNC_NEEDED) ||
243 		    (rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
244 			rdc_clr_mflags(urdc, RDC_RSYNC_NEEDED);
245 			rdc_clr_flags(urdc, RDC_VOL_FAILED);
246 			rdc_write_state(urdc);
247 		}
248 		if (IS_MANY(krdc)) {
249 			for (ktmp = krdc->many_next; ktmp != krdc;
250 			    ktmp = ktmp->many_next) {
251 				utmp = &rdc_u_info[ktmp->index];
252 				if (!IS_ENABLED(utmp))
253 					continue;
254 				if (rdc_get_mflags(utmp) & RDC_RSYNC_NEEDED) {
255 					rdc_clr_mflags(utmp, RDC_RSYNC_NEEDED);
256 					rdc_write_state(utmp);
257 				}
258 			}
259 		}
260 	}
261 
262 	rdc_many_exit(krdc);
263 
264 	return (0);
265 }
266 
267 
268 /*
269  * r_net_null
270  * Proc 0 Null action
271  */
272 static void
273 r_net_null(SVCXPRT *xprt)
274 {
275 	(void) svc_sendreply(xprt, xdr_void, 0);
276 }
277 
278 /*
279  * r_net_read
280  */
281 static void
282 r_net_read(SVCXPRT *xprt)
283 {
284 	readres resp;
285 	rdc_u_info_t *urdc;
286 	struct rread diskio;
287 	char *buffer = NULL;
288 	uchar_t *sv_addr;
289 	nsc_vec_t *vec;
290 	int pos, st;
291 	int nocache;
292 	int sv_len;
293 	nsc_vec_t *vector = NULL;
294 	rdc_net_dataset_t *dset = NULL;
295 	int vecsz = 0;
296 
297 	st = SVC_GETARGS(xprt, xdr_rread, (char *)&diskio);
298 	if (!st) {
299 		(void) svc_sendreply(xprt, xdr_int, (char *)&st);
300 		return;
301 	}
302 	nocache = (diskio.flag & RDC_RREAD_FAIL) ? 0 : NSC_NOCACHE;
303 
304 	if ((diskio.cd >= rdc_max_sets) || (diskio.cd < 0)) {
305 		resp.rr_status = RDCERR_NOENT;
306 		(void) svc_sendreply(xprt, xdr_readres, (char *)&resp);
307 #ifdef DEBUG
308 		cmn_err(CE_NOTE,
309 		    "!r_net_read: EPROTO cd out or not enabled");
310 #endif
311 		return;
312 	}
313 
314 	urdc = &rdc_u_info[diskio.cd];
315 
316 	if (diskio.flag & RDC_RREAD_START) {
317 		/* setup rpc */
318 		if (!IS_ENABLED(urdc)) {
319 			st = 0;
320 			(void) svc_sendreply(xprt, xdr_int, (char *)&st);
321 			return;
322 		}
323 		st = rdc_readmaxfba(diskio.cd, diskio.pos, diskio.len,
324 		    nocache);
325 
326 		if (!svc_sendreply(xprt, xdr_int, (char *)&st)) {
327 			if (st != 0) {
328 				rdc_net_dataset_t *dset;
329 				if (dset = rdc_net_get_set(diskio.cd, st)) {
330 					rdc_net_del_set(diskio.cd, dset);
331 				} else {
332 					cmn_err(CE_NOTE, "!r_net_read: get_set "
333 					    "has failed in cleanup");
334 				}
335 			}
336 		}
337 		return;
338 	}
339 
340 	/* data rpc */
341 
342 #ifdef DEBUG
343 	if ((diskio.flag & RDC_RREAD_DATA) == 0) {
344 		cmn_err(CE_WARN, "!r_net_read: received non-DATA rpc! flag %x",
345 		    diskio.flag);
346 	}
347 #endif
348 
349 	dset = rdc_net_get_set(diskio.cd, diskio.idx);
350 	if (dset) {
351 		vector = rdc_dset2vec(dset);
352 	}
353 	if (vector == NULL) {
354 		resp.rr_status = RDCERR_NOMEM;
355 		(void) svc_sendreply(xprt, xdr_readres, (char *)&resp);
356 		goto cleanup;
357 	}
358 	vecsz = (dset->nitems + 1) * sizeof (nsc_vec_t);
359 
360 	if (!IS_ENABLED(urdc)) {
361 		resp.rr_status = RDCERR_NOENT;
362 		(void) svc_sendreply(xprt, xdr_readres, (char *)&resp);
363 		goto cleanup;
364 	}
365 	resp.rr_status = RDC_OK;
366 
367 	/* find place in vector */
368 	vec = vector;
369 	pos = diskio.pos - dset->pos;
370 
371 	for (; pos >= FBA_NUM(vec->sv_len); vec++)
372 		pos -= FBA_NUM(vec->sv_len);
373 
374 	sv_addr = vec->sv_addr + FBA_SIZE(pos);
375 	sv_len = vec->sv_len - FBA_SIZE(pos);
376 
377 	/*
378 	 * IF the data is in a single sb_vec entry
379 	 * THEN
380 	 *	we can just point to that
381 	 * ELSE
382 	 *	we have to alloc a local buffer,
383 	 *	copy the data in and the point to
384 	 *	the local buffer.
385 	 */
386 
387 	if (sv_len >= FBA_SIZE(diskio.len)) {
388 		/* fast */
389 		resp.rr_data = (char *)sv_addr;
390 		resp.rr_bufsize = FBA_SIZE(diskio.len);
391 	} else {
392 		/* slow */
393 		rdc_rread_slow++;	/* rough count */
394 		resp.rr_bufsize = FBA_SIZE(diskio.len);
395 		buffer = kmem_alloc(resp.rr_bufsize, KM_NOSLEEP);
396 		if (!buffer) {
397 			resp.rr_status = RDCERR_NOMEM;
398 		} else {
399 			resp.rr_data = buffer;
400 			if (!rdc_dsetcopy(dset, vector, diskio.pos, diskio.len,
401 			    resp.rr_data, resp.rr_bufsize, COPY_IN)) {
402 				resp.rr_status = RDCERR_NOMEM; /* ??? */
403 			}
404 		}
405 	}
406 
407 	st = svc_sendreply(xprt, xdr_readres, (char *)&resp); /* send data */
408 
409 cleanup:
410 
411 	if (dset) {
412 		if (!st ||
413 		    (diskio.flag & RDC_RREAD_END) ||
414 		    (resp.rr_status != RDC_OK)) {
415 			/*
416 			 * RPC reply failed, OR
417 			 * Last RPC for this IO operation, OR
418 			 * We are failing this IO operation.
419 			 *
420 			 * Do cleanup.
421 			 */
422 			rdc_net_del_set(diskio.cd, dset);
423 		} else {
424 			rdc_net_put_set(diskio.cd, dset);
425 		}
426 	}
427 
428 	if (buffer)
429 		kmem_free(buffer, resp.rr_bufsize);
430 	if (vector) {
431 		kmem_free(vector, vecsz);
432 		RDC_DSMEMUSE(-vecsz);
433 	}
434 }
435 
436 /*
437  * r_net_read (v6)
438  */
439 static void
440 r_net_read6(SVCXPRT *xprt)
441 {
442 	readres resp;
443 	rdc_u_info_t *urdc;
444 	struct rread6 diskio;
445 	char *buffer = NULL;
446 	uchar_t *sv_addr;
447 	nsc_vec_t *vec;
448 	int pos, st;
449 	int nocache;
450 	int sv_len;
451 	nsc_vec_t *vector = NULL;
452 	rdc_net_dataset_t *dset = NULL;
453 	int vecsz = 0;
454 
455 	st = SVC_GETARGS(xprt, xdr_rread6, (char *)&diskio);
456 	if (!st) {
457 		(void) svc_sendreply(xprt, xdr_int, (char *)&st);
458 		return;
459 	}
460 	nocache = (diskio.flag & RDC_RREAD_FAIL) ? 0 : NSC_NOCACHE;
461 
462 	if ((diskio.cd >= rdc_max_sets) || (diskio.cd < 0)) {
463 		resp.rr_status = RDCERR_NOENT;
464 		(void) svc_sendreply(xprt, xdr_readres, (char *)&resp);
465 #ifdef DEBUG
466 		cmn_err(CE_NOTE, "!r_net_read6: EPROTO cd out or not enabled");
467 #endif
468 		return;
469 	}
470 
471 	urdc = &rdc_u_info[diskio.cd];
472 
473 	if (diskio.flag & RDC_RREAD_START) {
474 		/* setup rpc */
475 		if (!IS_ENABLED(urdc)) {
476 			st = 0;
477 			(void) svc_sendreply(xprt, xdr_int, (char *)&st);
478 			return;
479 		}
480 		st = rdc_readmaxfba(diskio.cd, diskio.pos, diskio.len,
481 		    nocache);
482 
483 		if (!svc_sendreply(xprt, xdr_int, (char *)&st)) {
484 			if (st != 0) {
485 				rdc_net_dataset_t *dset;
486 				if (dset = rdc_net_get_set(diskio.cd, st)) {
487 					rdc_net_del_set(diskio.cd, dset);
488 				} else {
489 					cmn_err(CE_NOTE, "!read6: get_set "
490 					    "has failed in cleanup");
491 				}
492 			}
493 		}
494 		return;
495 	}
496 
497 	/* data rpc */
498 
499 #ifdef DEBUG
500 	if ((diskio.flag & RDC_RREAD_DATA) == 0) {
501 		cmn_err(CE_WARN, "!read6: received non-DATA rpc! flag %x",
502 		    diskio.flag);
503 	}
504 #endif
505 
506 	dset = rdc_net_get_set(diskio.cd, diskio.idx);
507 	if (dset) {
508 		vector = rdc_dset2vec(dset);
509 	}
510 	if (vector == NULL) {
511 		resp.rr_status = RDCERR_NOMEM;
512 		(void) svc_sendreply(xprt, xdr_readres, (char *)&resp);
513 		goto cleanup;
514 	}
515 	vecsz = (dset->nitems + 1) * sizeof (nsc_vec_t);
516 
517 	if (!IS_ENABLED(urdc)) {
518 		resp.rr_status = RDCERR_NOENT;
519 		(void) svc_sendreply(xprt, xdr_readres, (char *)&resp);
520 		goto cleanup;
521 	}
522 	resp.rr_status = RDC_OK;
523 
524 	/* find place in vector */
525 	vec = vector;
526 	pos = diskio.pos - dset->pos;
527 
528 	for (; pos >= FBA_NUM(vec->sv_len); vec++)
529 		pos -= FBA_NUM(vec->sv_len);
530 
531 	sv_addr = vec->sv_addr + FBA_SIZE(pos);
532 	sv_len = vec->sv_len - FBA_SIZE(pos);
533 
534 	/*
535 	 * IF the data is in a single sb_vec entry
536 	 * THEN
537 	 *	we can just point to that
538 	 * ELSE
539 	 *	we have to alloc a local buffer,
540 	 *	copy the data in and the point to
541 	 *	the local buffer.
542 	 */
543 
544 	if (sv_len >= FBA_SIZE(diskio.len)) {
545 		/* fast */
546 		resp.rr_data = (char *)sv_addr;
547 		resp.rr_bufsize = FBA_SIZE(diskio.len);
548 	} else {
549 		/* slow */
550 		rdc_rread_slow++;	/* rough count */
551 		resp.rr_bufsize = FBA_SIZE(diskio.len);
552 		buffer = kmem_alloc(resp.rr_bufsize, KM_NOSLEEP);
553 		if (!buffer) {
554 			resp.rr_status = RDCERR_NOMEM;
555 		} else {
556 			resp.rr_data = buffer;
557 			if (!rdc_dsetcopy(dset, vector, diskio.pos, diskio.len,
558 			    resp.rr_data, resp.rr_bufsize, COPY_IN)) {
559 				resp.rr_status = RDCERR_NOMEM; /* ??? */
560 			}
561 		}
562 	}
563 
564 	st = svc_sendreply(xprt, xdr_readres, (char *)&resp); /* send data */
565 
566 cleanup:
567 
568 	if (dset) {
569 		if (!st ||
570 		    (diskio.flag & RDC_RREAD_END) ||
571 		    (resp.rr_status != RDC_OK)) {
572 			/*
573 			 * RPC reply failed, OR
574 			 * Last RPC for this IO operation, OR
575 			 * We are failing this IO operation.
576 			 *
577 			 * Do cleanup.
578 			 */
579 			rdc_net_del_set(diskio.cd, dset);
580 		} else {
581 			rdc_net_put_set(diskio.cd, dset);
582 		}
583 	}
584 
585 	if (buffer)
586 		kmem_free(buffer, resp.rr_bufsize);
587 	if (vector) {
588 		kmem_free(vector, vecsz);
589 		RDC_DSMEMUSE(-vecsz);
590 	}
591 }
592 
593 /*
594  * r_net_write (Version 5)
595  * 0 reply indicates error
596  * >0 reply indicates a net handle index
597  * <0 reply indicates errno
598  * ret net handle index
599  * ret2 general error
600  * ret3 multi-hop errors (never returned)
601  */
602 static void
603 r_net_write5(SVCXPRT *xprt)
604 {
605 	rdc_k_info_t *krdc;
606 	rdc_u_info_t *urdc;
607 	struct net_data5 diskio;
608 	rdc_net_dataset_t *dset;
609 	rdc_net_dataitem_t *ditem;
610 	int nocache;
611 	int ret = 0;
612 	int ret2 = 0;
613 	int st;
614 
615 	krdc = NULL;
616 	diskio.data.data_val = kmem_alloc(RDC_MAXDATA, KM_NOSLEEP);
617 
618 	if (!diskio.data.data_val) {
619 		ret2 = ENOMEM;
620 		goto out;
621 	}
622 	RDC_DSMEMUSE(RDC_MAXDATA);
623 	st = SVC_GETARGS(xprt, xdr_net_data5, (char *)&diskio);
624 	if (!st) {
625 		ret2 = ENOMEM;
626 #ifdef DEBUG
627 		cmn_err(CE_NOTE, "!r_net_write5:SVC_GETARGS failed: st %d", st);
628 #endif
629 		goto out;
630 	}
631 	if ((diskio.cd >= rdc_max_sets) || (diskio.cd < 0)) {
632 		ret2 = EPROTO;
633 #ifdef DEBUG
634 		cmn_err(CE_NOTE, "!r_net_write6: EPROTO cd out or not enabled");
635 #endif
636 		goto out;
637 	}
638 
639 	nocache = (diskio.flag & RDC_RWRITE_FAIL) ? 0 : NSC_NOCACHE;
640 	krdc = &rdc_k_info[diskio.cd];
641 	urdc = &rdc_u_info[diskio.cd];
642 
643 	if (!IS_ENABLED(urdc) || IS_STATE(urdc, RDC_LOGGING)) {
644 		ret2 = EPROTO;
645 #ifdef DEBUG
646 		cmn_err(CE_NOTE, "!r_net_write6: cd logging / not enabled (%x)",
647 		    rdc_get_vflags(urdc));
648 #endif
649 		krdc = NULL; /* so we don't try to unqueue kstat entry */
650 		goto out;
651 	}
652 
653 	if (krdc->io_kstats) {
654 		mutex_enter(krdc->io_kstats->ks_lock);
655 		kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
656 		mutex_exit(krdc->io_kstats->ks_lock);
657 	}
658 
659 
660 	/* -1 index says allocate a buffer */
661 	if (diskio.idx < 0) {
662 		dset = rdc_net_add_set(diskio.cd);
663 		if (dset == NULL) {
664 #ifdef DEBUG
665 			cmn_err(CE_NOTE, "!r_net_write5: "
666 			    "failed to add dataset");
667 #endif
668 			ret2 = EIO;
669 			goto out;
670 		} else {
671 			ret = dset->id;
672 			dset->pos = diskio.pos;
673 			dset->fbalen = diskio.len;
674 			diskio.idx = ret;
675 		}
676 		ditem = kmem_alloc(sizeof (rdc_net_dataitem_t), KM_NOSLEEP);
677 		if (ditem == NULL) {
678 			ret2 = ENOMEM;
679 			goto out;
680 		}
681 		RDC_DSMEMUSE(sizeof (rdc_net_dataitem_t));
682 		/*
683 		 * If this is a single transfer, then we don't
684 		 * need to allocate any memory for the data,
685 		 * just point the ditem data pointer to the
686 		 * existing buffer.
687 		 */
688 		ditem->next = NULL;
689 		if (diskio.endoblk) {
690 			ditem->dptr = diskio.data.data_val;
691 			/*
692 			 * So we don't free it twice.
693 			 */
694 			diskio.data.data_val = NULL;
695 			ditem->len = diskio.data.data_len;
696 			ditem->mlen = RDC_MAXDATA;
697 		} else {
698 			/*
699 			 * Allocate the memory for the complete
700 			 * transfer.
701 			 */
702 			ditem->dptr = kmem_alloc(FBA_SIZE(diskio.len),
703 			    KM_NOSLEEP);
704 			if (ditem->dptr == NULL) {
705 				ret2 = ENOMEM;
706 				goto out;
707 			}
708 			RDC_DSMEMUSE(FBA_SIZE(diskio.len));
709 			ditem->len = FBA_SIZE(diskio.len);
710 			ditem->mlen = ditem->len;
711 
712 			/*
713 			 * Copy the data to the new buffer.
714 			 */
715 			ASSERT(diskio.data.data_len == FBA_SIZE(diskio.nfba));
716 			bcopy(diskio.data.data_val, ditem->dptr,
717 			    diskio.data.data_len);
718 			/*
719 			 * free the old data buffer.
720 			 */
721 			kmem_free(diskio.data.data_val, RDC_MAXDATA);
722 			RDC_DSMEMUSE(-RDC_MAXDATA);
723 			diskio.data.data_val = NULL;
724 		}
725 		dset->head = ditem;
726 		dset->tail = ditem;
727 		dset->nitems++;
728 	} else {
729 		ret = diskio.idx;
730 		dset = rdc_net_get_set(diskio.cd, diskio.idx);
731 		if (dset == NULL) {
732 			ret2 = EPROTO;
733 #ifdef DEBUG
734 			cmn_err(CE_NOTE,
735 			    "!r_net_write5: net_get_set failed cd %d idx %d",
736 			    diskio.cd, diskio.idx);
737 #endif
738 			goto out;
739 		}
740 		/*
741 		 * We have to copy the data from the rpc buffer
742 		 * to the data in ditem.
743 		 */
744 		ditem = dset->head;
745 		bcopy(diskio.data.data_val, (char *)ditem->dptr +
746 		    FBA_SIZE(diskio.sfba - diskio.pos), diskio.data.data_len);
747 
748 		kmem_free(diskio.data.data_val, RDC_MAXDATA);
749 		RDC_DSMEMUSE(-RDC_MAXDATA);
750 		diskio.data.data_val = NULL;
751 	}
752 	ASSERT(dset);
753 
754 	if (diskio.endoblk) {
755 		ret2 = rdc_writemaxfba(krdc, urdc, dset, diskio.seq, nocache);
756 		rdc_net_del_set(diskio.cd, dset);
757 		dset = NULL;
758 	}
759 out:
760 	if (!RDC_SUCCESS(ret2)) {
761 		if (ret2 > 0)
762 			ret2 = -ret2;
763 		DTRACE_PROBE1(rdc_svcwrite5_err_ret2, int, ret2);
764 		st = svc_sendreply(xprt, xdr_int, (char *)&ret2);
765 	} else
766 		st = svc_sendreply(xprt, xdr_int, (char *)&ret);
767 
768 	if (krdc && krdc->io_kstats && ret2 != ENOMEM) {
769 		mutex_enter(krdc->io_kstats->ks_lock);
770 		kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
771 		mutex_exit(krdc->io_kstats->ks_lock);
772 	}
773 	/*
774 	 * On Error we must cleanup.
775 	 * If we have a handle, free it.
776 	 * If we have a network handle, free it.
777 	 */
778 	if (!st || !RDC_SUCCESS(ret2)) {
779 #ifdef DEBUG
780 		cmn_err(CE_WARN, "!r_net_write5 error case? st %x ret %d",
781 		    st, ret2);
782 #endif
783 		if (dset) {
784 			rdc_net_del_set(diskio.cd, dset);
785 		}
786 
787 	} else {
788 		if (dset) {
789 			rdc_net_put_set(diskio.cd, dset);
790 		}
791 	}
792 	if (diskio.data.data_val) {
793 		kmem_free(diskio.data.data_val, RDC_MAXDATA);
794 		RDC_DSMEMUSE(-RDC_MAXDATA);
795 	}
796 }
797 
798 /*
799  * r_net_write (Version 6)
800  * index 0 = error, or net handle index.
801  * result = 0 , ok.
802  * result = 1, pending write.
803  * result < 0 error, and is the -errno.
804  * ret net handle index.
805  * ret2 general error.
806  */
807 static void
808 r_net_write6(SVCXPRT *xprt)
809 {
810 	rdc_k_info_t *krdc;
811 	rdc_u_info_t *urdc;
812 	rdc_group_t *group;
813 	struct net_data6 diskio;
814 	struct netwriteres netret;
815 	rdc_net_dataset_t *dset;
816 	rdc_net_dataitem_t *ditem;
817 	int ret = 0;
818 	int ret2 = 0;
819 	int st;
820 	int nocache;
821 
822 	netret.vecdata.vecdata_val = NULL;
823 	netret.vecdata.vecdata_len = 0;
824 	dset = NULL;
825 	krdc = NULL;
826 	diskio.data.data_val = kmem_alloc(RDC_MAXDATA, KM_NOSLEEP);
827 
828 	if (!diskio.data.data_val) {
829 		ret2 = ENOMEM;
830 		goto out;
831 	}
832 	RDC_DSMEMUSE(RDC_MAXDATA);
833 	st = SVC_GETARGS(xprt, xdr_net_data6, (char *)&diskio);
834 	if (!st) {
835 		ret2 = ENOMEM;
836 #ifdef DEBUG
837 		cmn_err(CE_NOTE,
838 		    "!r_net_write6:SVC_GETARGS failed: st  %d", st);
839 #endif
840 		goto out;
841 	}
842 
843 	if ((diskio.cd >= rdc_max_sets) || (diskio.cd < 0)) {
844 		ret2 = EPROTO;
845 #ifdef DEBUG
846 		cmn_err(CE_NOTE, "!r_net_write6: EPROTO cd out or not enabled");
847 #endif
848 		goto out;
849 	}
850 
851 	nocache = (diskio.flag & RDC_RWRITE_FAIL) ? 0 : NSC_NOCACHE;
852 	netret.seq = diskio.seq;
853 
854 	krdc = &rdc_k_info[diskio.cd];
855 	urdc = &rdc_u_info[diskio.cd];
856 
857 	if (!IS_ENABLED(urdc) || IS_STATE(urdc, RDC_LOGGING)) {
858 		ret2 = EPROTO;
859 #ifdef DEBUG
860 		cmn_err(CE_NOTE,
861 		    "!r_net_write6: cd logging or not enabled (%x)",
862 		    rdc_get_vflags(urdc));
863 #endif
864 		krdc = NULL; /* so we don't try to unqueue kstat entry */
865 		goto out;
866 	}
867 
868 	group = krdc->group;
869 	if (group == NULL) {
870 		ret2 = EIO;
871 #ifdef DEBUG
872 		cmn_err(CE_NOTE,
873 		    "!r_net_write6: No group structure for set %s:%s",
874 		    urdc->secondary.intf, urdc->secondary.file);
875 #endif
876 		krdc = NULL; /* so we don't try to unqueue kstat entry */
877 		goto out;
878 	}
879 
880 #ifdef DEBUG
881 	if (rdc_netwrite6) {
882 		cmn_err(CE_NOTE,
883 		    "!r_net_write6: idx %d seq %u current seq %u pos %llu "
884 		    "len %d sfba %llu nfba %d endoblk %d",
885 		    diskio.idx, diskio.seq, group->seq,
886 		    (unsigned long long)diskio.pos, diskio.len,
887 		    (unsigned long long)diskio.sfba, diskio.nfba,
888 		    diskio.endoblk);
889 	}
890 #endif
891 
892 	if (krdc->io_kstats) {
893 		mutex_enter(krdc->io_kstats->ks_lock);
894 		kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
895 		mutex_exit(krdc->io_kstats->ks_lock);
896 	}
897 
898 	/* -1 index says allocate a net dataset */
899 	if (diskio.idx < 0) {
900 		dset = rdc_net_add_set(diskio.cd);
901 		if (dset == NULL) {
902 #ifdef DEBUG
903 			cmn_err(CE_NOTE,
904 			    "!r_net_write6: failed to add dataset");
905 #endif
906 			ret2 = EIO;
907 			goto out;
908 		} else {
909 			ret = dset->id;
910 			dset->pos = (nsc_off_t)diskio.pos; /* 64bit! */
911 			dset->fbalen = diskio.len;
912 			diskio.idx = ret;
913 		}
914 		ditem = kmem_alloc(sizeof (rdc_net_dataitem_t), KM_NOSLEEP);
915 		if (ditem == NULL) {
916 			ret2 = ENOMEM;
917 			goto out;
918 		}
919 		RDC_DSMEMUSE(sizeof (rdc_net_dataitem_t));
920 		/*
921 		 * If this is a single transfer, then we don't
922 		 * need to allocate any memory for the data,
923 		 * just point the ditem data pointer to the
924 		 * existing buffer.
925 		 */
926 		ditem->next = NULL;
927 		if (diskio.endoblk) {
928 			ditem->dptr = diskio.data.data_val;
929 			/*
930 			 * So we don't free it twice.
931 			 */
932 			diskio.data.data_val = NULL;
933 			ditem->len = diskio.data.data_len;
934 			ditem->mlen = RDC_MAXDATA;
935 		} else {
936 			/*
937 			 * Allocate the memory for the complete
938 			 * transfer.
939 			 */
940 			ditem->dptr = kmem_alloc(FBA_SIZE(diskio.len),
941 			    KM_NOSLEEP);
942 			if (ditem->dptr == NULL) {
943 				ret2 = ENOMEM;
944 				goto out;
945 			}
946 			RDC_DSMEMUSE(FBA_SIZE(diskio.len));
947 			ditem->len = FBA_SIZE(diskio.len);
948 			ditem->mlen = ditem->len;
949 
950 			/*
951 			 * Copy the data to the new buffer.
952 			 */
953 			ASSERT(diskio.data.data_len == FBA_SIZE(diskio.nfba));
954 			bcopy(diskio.data.data_val, ditem->dptr,
955 			    diskio.data.data_len);
956 			/*
957 			 * free the old data buffer.
958 			 */
959 			kmem_free(diskio.data.data_val, RDC_MAXDATA);
960 			RDC_DSMEMUSE(-RDC_MAXDATA);
961 			diskio.data.data_val = NULL;
962 		}
963 		dset->head = ditem;
964 		dset->tail = ditem;
965 		dset->nitems++;
966 	} else {
967 		ret = diskio.idx;
968 		dset = rdc_net_get_set(diskio.cd, diskio.idx);
969 		if (dset == NULL) {
970 			ret2 = EPROTO;
971 #ifdef DEBUG
972 			cmn_err(CE_NOTE,
973 			    "!r_net_write6: net_get_set failed cd %d idx %d "
974 			    "packet sequence %u expected seq %u",
975 			    diskio.cd, diskio.idx, diskio.seq, group->seq);
976 #endif
977 			goto out;
978 		}
979 		/*
980 		 * We have to copy the data from the rpc buffer
981 		 * to the data in ditem.
982 		 */
983 		ditem = dset->head;
984 		bcopy(diskio.data.data_val, (char *)ditem->dptr +
985 		    FBA_SIZE(diskio.sfba - diskio.pos), diskio.data.data_len);
986 
987 		kmem_free(diskio.data.data_val, RDC_MAXDATA);
988 		RDC_DSMEMUSE(-RDC_MAXDATA);
989 		diskio.data.data_val = NULL;
990 	}
991 	ASSERT(dset);
992 
993 	if (diskio.endoblk) {
994 #ifdef DEBUG
995 		if (diskio.seq == (RDC_NEWSEQ + 1)) {
996 			rdc_stallzero(2);
997 		}
998 #endif
999 		if (diskio.seq == RDC_NEWSEQ) {
1000 			/*
1001 			 * magic marker, start of sequence.
1002 			 */
1003 			mutex_enter(&group->ra_queue.net_qlock);
1004 			/*
1005 			 * see if some threads are stuck.
1006 			 */
1007 			if (group->sleepq) {
1008 				rdc_sleepqdiscard(group);
1009 			}
1010 			group->seqack = RDC_NEWSEQ;
1011 			mutex_exit(&group->ra_queue.net_qlock);
1012 		}
1013 
1014 		if ((diskio.seq != RDC_NOSEQ) && (diskio.seq != RDC_NEWSEQ)) {
1015 			/*
1016 			 * see if we are allowed through here to
1017 			 * do the write, or if we have to q the
1018 			 * request and send back a pending reply.
1019 			 */
1020 			mutex_enter(&group->ra_queue.net_qlock);
1021 			if (diskio.seq != group->seq) {
1022 				rdc_sleepq_t	*sq;
1023 				int maxseq;
1024 
1025 				/*
1026 				 * Check that we have room.
1027 				 */
1028 				maxseq = group->seqack + RDC_MAXPENDQ + 1;
1029 				if (maxseq < group->seqack) {
1030 					/*
1031 					 * skip magic values.
1032 					 */
1033 					maxseq += RDC_NEWSEQ + 1;
1034 				}
1035 				if (!RDC_INFRONT(diskio.seq, maxseq)) {
1036 #ifdef	DEBUG
1037 					cmn_err(CE_WARN, "!net_write6: Queue "
1038 					    "size %d exceeded seqack %u "
1039 					    "this seq %u maxseq %u seq %u",
1040 					    RDC_MAXPENDQ, group->seqack,
1041 					    diskio.seq, maxseq, group->seq);
1042 #endif
1043 				DTRACE_PROBE2(qsize_exceeded, int, diskio.seq,
1044 				    int, maxseq);
1045 					if (!(rdc_get_vflags(urdc) &
1046 					    RDC_VOL_FAILED)) {
1047 						rdc_many_enter(krdc);
1048 						rdc_set_flags(urdc,
1049 						    RDC_VOL_FAILED);
1050 						rdc_many_exit(krdc);
1051 						rdc_write_state(urdc);
1052 					}
1053 					ret2 = EIO;
1054 					rdc_sleepqdiscard(group);
1055 					group->seq = RDC_NEWSEQ;
1056 					group->seqack = RDC_NEWSEQ;
1057 					mutex_exit(&group->ra_queue.net_qlock);
1058 					goto out;
1059 				}
1060 
1061 				sq = rdc_newsleepq();
1062 				sq->seq = diskio.seq;
1063 				sq->sindex = diskio.cd;
1064 				sq->pindex = diskio.local_cd;
1065 				sq->idx = diskio.idx;
1066 				sq->qpos = diskio.qpos;
1067 				sq->nocache = nocache;
1068 				if (rdc_sleepq(group, sq)) {
1069 					ret2 = EIO;
1070 					group->seq = RDC_NEWSEQ;
1071 					group->seqack = RDC_NEWSEQ;
1072 					rdc_sleepqdiscard(group);
1073 					mutex_exit(&group->ra_queue.net_qlock);
1074 					goto out;
1075 				}
1076 				rdc_net_put_set(diskio.cd, dset);
1077 				dset = NULL;
1078 				if (krdc->io_kstats) {
1079 					mutex_enter(krdc->io_kstats->ks_lock);
1080 					kstat_waitq_enter(KSTAT_IO_PTR(krdc->
1081 					    io_kstats));
1082 					mutex_exit(krdc->io_kstats->ks_lock);
1083 				}
1084 				mutex_exit(&group->ra_queue.net_qlock);
1085 				/*
1086 				 * pending state.
1087 				 */
1088 				netret.result = 1;
1089 				netret.index = diskio.idx;
1090 				st = svc_sendreply(xprt, xdr_netwriteres,
1091 				    (char *)&netret);
1092 				if (krdc->io_kstats && ret2 != ENOMEM) {
1093 					mutex_enter(krdc->io_kstats->ks_lock);
1094 					kstat_runq_exit(KSTAT_IO_PTR(
1095 					    krdc->io_kstats));
1096 					mutex_exit(krdc->io_kstats->ks_lock);
1097 				}
1098 				return;
1099 			}
1100 			mutex_exit(&group->ra_queue.net_qlock);
1101 		}
1102 
1103 		ret2 = rdc_writemaxfba(krdc, urdc, dset, diskio.seq, nocache);
1104 		rdc_net_del_set(diskio.cd, dset);
1105 		dset = NULL;
1106 #ifdef	DEBUG
1107 		if (!RDC_SUCCESS(ret2)) {
1108 			cmn_err(CE_WARN, "!r_net_write6: writemaxfba failed %d",
1109 			    ret2);
1110 		}
1111 #endif
1112 		if (diskio.seq != RDC_NOSEQ) {
1113 			mutex_enter(&group->ra_queue.net_qlock);
1114 			group->seq = diskio.seq + 1;
1115 			if (group->seq < diskio.seq)
1116 				group->seq = RDC_NEWSEQ + 1;
1117 			if (group->sleepq &&
1118 			    (group->sleepq->seq == group->seq)) {
1119 				rdc_dopending(group, &netret);
1120 			}
1121 			group->seqack = group->seq;
1122 			mutex_exit(&group->ra_queue.net_qlock);
1123 		}
1124 	}
1125 out:
1126 	if (!RDC_SUCCESS(ret2)) {
1127 		DTRACE_PROBE1(rdc_svcwrite6_err_ret2, int, ret2);
1128 		netret.result = -ret2;
1129 	} else {
1130 		netret.result = 0;
1131 		netret.index = ret;
1132 	}
1133 	st = svc_sendreply(xprt, xdr_netwriteres, (char *)&netret);
1134 	if (netret.vecdata.vecdata_val) {
1135 		kmem_free(netret.vecdata.vecdata_val,
1136 		    netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
1137 	}
1138 	if (krdc && krdc->io_kstats && ret2 != ENOMEM) {
1139 		mutex_enter(krdc->io_kstats->ks_lock);
1140 		kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
1141 		mutex_exit(krdc->io_kstats->ks_lock);
1142 	}
1143 	/*
1144 	 * On Error we must cleanup.
1145 	 * If we have a handle, free it.
1146 	 * If we have a network handle, free it.
1147 	 * If we hold the main nsc buffer, free it.
1148 	 */
1149 	if (!st || !RDC_SUCCESS(ret2)) {
1150 #ifdef DEBUG
1151 		cmn_err(CE_WARN, "!r_net_write6 error st %x ret %d seq %u",
1152 		    st, ret2, diskio.seq);
1153 #endif
1154 		if (dset) {
1155 			rdc_net_del_set(diskio.cd, dset);
1156 		}
1157 	} else {
1158 		if (dset) {
1159 			rdc_net_put_set(diskio.cd, dset);
1160 		}
1161 	}
1162 	if (diskio.data.data_val) {
1163 		kmem_free(diskio.data.data_val, RDC_MAXDATA);
1164 		RDC_DSMEMUSE(-RDC_MAXDATA);
1165 	}
1166 }
1167 
1168 /*
1169  * r_net_ping4
1170  *
1171  * received on the primary.
1172  */
1173 static void
1174 r_net_ping4(SVCXPRT *xprt, struct svc_req *req)
1175 {
1176 	struct rdc_ping6 ping;
1177 	int e, ret = 0;
1178 	rdc_if_t *ip;
1179 
1180 	e = SVC_GETARGS(xprt, xdr_rdc_ping6, (char *)&ping);
1181 	if (e) {
1182 		mutex_enter(&rdc_ping_lock);
1183 
1184 		/* update specified interface */
1185 
1186 		for (ip = rdc_if_top; ip; ip = ip->next) {
1187 			if ((bcmp(ping.p_ifaddr, ip->ifaddr.buf,
1188 			    RDC_MAXADDR) == 0) &&
1189 			    (bcmp(ping.s_ifaddr, ip->r_ifaddr.buf,
1190 			    RDC_MAXADDR) == 0)) {
1191 				ip->new_pulse++;
1192 				ip->deadness = 1;
1193 
1194 				/* Update the rpc protocol version to use */
1195 
1196 				ip->rpc_version = req->rq_vers;
1197 				break;
1198 			}
1199 		}
1200 
1201 		mutex_exit(&rdc_ping_lock);
1202 	} else {
1203 		svcerr_decode(xprt);
1204 #ifdef DEBUG
1205 		cmn_err(CE_NOTE, "!SNDR: couldn't get ping4 arguments");
1206 #endif
1207 	}
1208 
1209 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1210 }
1211 
1212 /*
1213  * r_net_ping7
1214  *
1215  * received on the primary.
1216  */
1217 static void
1218 r_net_ping7(SVCXPRT *xprt, struct svc_req *req)
1219 {
1220 	struct rdc_ping ping;
1221 	int e, ret = 0;
1222 	rdc_if_t *ip;
1223 	unsigned short *sp;
1224 
1225 	bzero(&ping, sizeof (struct rdc_ping));
1226 	e = SVC_GETARGS(xprt, xdr_rdc_ping, (char *)&ping);
1227 	if (e) {
1228 		sp = (unsigned short *)ping.p_ifaddr.buf;
1229 		*sp = ntohs(*sp);
1230 		sp = (unsigned short *)ping.s_ifaddr.buf;
1231 		*sp = ntohs(*sp);
1232 		mutex_enter(&rdc_ping_lock);
1233 
1234 		/* update specified interface */
1235 
1236 		for (ip = rdc_if_top; ip; ip = ip->next) {
1237 			if ((bcmp(ping.p_ifaddr.buf, ip->ifaddr.buf,
1238 			    ping.p_ifaddr.len) == 0) &&
1239 			    (bcmp(ping.s_ifaddr.buf, ip->r_ifaddr.buf,
1240 			    ping.s_ifaddr.len) == 0)) {
1241 				ip->new_pulse++;
1242 				ip->deadness = 1;
1243 
1244 				/* Update the rpc protocol version to use */
1245 
1246 				ip->rpc_version = req->rq_vers;
1247 				break;
1248 			}
1249 		}
1250 
1251 		mutex_exit(&rdc_ping_lock);
1252 	} else {
1253 		svcerr_decode(xprt);
1254 #ifdef DEBUG
1255 		cmn_err(CE_NOTE, "!SNDR: couldn't get ping7 arguments");
1256 #endif
1257 	}
1258 
1259 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1260 }
1261 
1262 
1263 /*
1264  * r_net_bmap (v5)
1265  * WARNING acts as both client and server
1266  */
1267 static void
1268 r_net_bmap(SVCXPRT *xprt)
1269 {
1270 	int e, ret = EINVAL;
1271 	struct bmap b;
1272 	rdc_k_info_t *krdc;
1273 	rdc_u_info_t *urdc;
1274 	struct bmap6 b6;
1275 
1276 
1277 	e = SVC_GETARGS(xprt, xdr_bmap, (char *)&b);
1278 	if (e == TRUE) {
1279 		krdc = &rdc_k_info[b.cd];
1280 		urdc = &rdc_u_info[b.cd];
1281 		if (b.cd >= 0 && b.cd < rdc_max_sets && IS_ENABLED(urdc) &&
1282 		    ((krdc->type_flag & RDC_DISABLEPEND) == 0)) {
1283 			krdc->rpc_version = RDC_VERSION5;
1284 			b6.cd = b.cd;
1285 			b6.dual = b.dual;
1286 			b6.size = b.size;
1287 			ret = RDC_SEND_BITMAP(&b6);
1288 		}
1289 	}
1290 
1291 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1292 }
1293 
1294 /*
1295  * r_net_bmap (v6)
1296  * WARNING acts as both client and server
1297  */
1298 static void
1299 r_net_bmap6(SVCXPRT *xprt)
1300 {
1301 	int e, ret = EINVAL;
1302 	struct bmap6 b;
1303 	rdc_k_info_t *krdc;
1304 	rdc_u_info_t *urdc;
1305 
1306 	e = SVC_GETARGS(xprt, xdr_bmap6, (char *)&b);
1307 	if (e == TRUE) {
1308 		krdc = &rdc_k_info[b.cd];
1309 		urdc = &rdc_u_info[b.cd];
1310 		if (b.cd >= 0 && b.cd < rdc_max_sets && IS_ENABLED(urdc) &&
1311 		    ((krdc->type_flag & RDC_DISABLEPEND) == 0)) {
1312 			krdc->rpc_version = RDC_VERSION6;
1313 			ret = RDC_SEND_BITMAP(&b);
1314 		}
1315 	}
1316 	/*
1317 	 * If the bitmap send has succeeded, clear it.
1318 	 */
1319 	if (ret == 0) {
1320 #ifdef DEBUG
1321 		cmn_err(CE_NOTE, "!Bitmap clear in r_net_bmap6");
1322 #endif
1323 		RDC_ZERO_BITMAP(krdc);
1324 		rdc_many_enter(krdc);
1325 		rdc_clr_flags(urdc, RDC_CLR_AFTERSYNC);
1326 		rdc_many_exit(krdc);
1327 	}
1328 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1329 }
1330 
1331 /*
1332  * r_net_bdata
1333  */
1334 static void
1335 r_net_bdata(SVCXPRT *xprt)
1336 {
1337 	struct net_bdata bd;
1338 	struct net_bdata6 bd6;
1339 	int e, ret = -1;
1340 	rdc_k_info_t *krdc;
1341 	rdc_u_info_t *urdc;
1342 
1343 	/*
1344 	 * We have to convert it to the internal form here,
1345 	 * net_data6, when we know that we will have to convert
1346 	 * it back to the v5 variant for transmission.
1347 	 */
1348 
1349 	bd.data.data_val = kmem_alloc(BMAP_BLKSIZE, KM_NOSLEEP);
1350 	if (bd.data.data_val == NULL)
1351 		goto out;
1352 
1353 	e = SVC_GETARGS(xprt, xdr_net_bdata, (char *)&bd);
1354 	if (e == TRUE) {
1355 		krdc = &rdc_k_info[bd.cd];
1356 		urdc = &rdc_u_info[bd.cd];
1357 		if (bd.cd >= 0 && bd.cd < rdc_max_sets && IS_ENABLED(urdc) &&
1358 		    ((krdc->type_flag & RDC_DISABLEPEND) == 0)) {
1359 			bd6.cd = bd.cd;
1360 			bd6.offset = bd.offset;
1361 			bd6.size = bd.size;
1362 			bd6.data.data_len = bd.data.data_len;
1363 			bd6.data.data_val = bd.data.data_val;
1364 			ret = RDC_OR_BITMAP(&bd6);
1365 		}
1366 	}
1367 	kmem_free(bd.data.data_val, BMAP_BLKSIZE);
1368 out:
1369 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1370 }
1371 
1372 /*
1373  * r_net_bdata v6
1374  */
1375 static void
1376 r_net_bdata6(SVCXPRT *xprt)
1377 {
1378 	struct net_bdata6 bd;
1379 	int e, ret = -1;
1380 	rdc_k_info_t *krdc;
1381 	rdc_u_info_t *urdc;
1382 
1383 	/*
1384 	 * just allocate the bigger block, regardless of < V7
1385 	 * bd.size will dictate how much we lor into our bitmap
1386 	 * the other option would be write r_net_bdata7 that is identical
1387 	 * to this function, but a V7 alloc.
1388 	 */
1389 	bd.data.data_val = kmem_alloc(BMAP_BLKSIZEV7, KM_NOSLEEP);
1390 	if (bd.data.data_val == NULL)
1391 		goto out;
1392 
1393 	e = SVC_GETARGS(xprt, xdr_net_bdata6, (char *)&bd);
1394 	if (e == TRUE) {
1395 		krdc = &rdc_k_info[bd.cd];
1396 		urdc = &rdc_u_info[bd.cd];
1397 		if (bd.cd >= 0 && bd.cd < rdc_max_sets && IS_ENABLED(urdc) &&
1398 		    ((krdc->type_flag & RDC_DISABLEPEND) == 0))
1399 			ret = RDC_OR_BITMAP(&bd);
1400 	}
1401 	/*
1402 	 * Write the merged bitmap.
1403 	 */
1404 	if ((ret == 0) && bd.endoblk && (krdc->bitmap_write > 0)) {
1405 #ifdef DEBUG
1406 		cmn_err(CE_NOTE, "!r_net_bdata6: Written bitmap for %s:%s",
1407 		    urdc->secondary.intf, urdc->secondary.file);
1408 #endif
1409 		ret = rdc_write_bitmap(krdc);
1410 	}
1411 	kmem_free(bd.data.data_val, BMAP_BLKSIZEV7);
1412 out:
1413 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1414 }
1415 
1416 /*
1417  * r_net_getsize (v5)
1418  */
1419 static void
1420 r_net_getsize(SVCXPRT *xprt)
1421 {
1422 	int e, ret = -1, index;
1423 	rdc_k_info_t *krdc;
1424 
1425 	e = SVC_GETARGS(xprt, xdr_int, (char *)&index);
1426 	if (e) {
1427 		krdc = &rdc_k_info[index];
1428 		if (IS_VALID_INDEX(index) && ((krdc->type_flag &
1429 		    RDC_DISABLEPEND) == 0))
1430 			ret = mirror_getsize(index);
1431 	}
1432 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1433 }
1434 
1435 /*
1436  * r_net_getsize (v6)
1437  */
1438 static void
1439 r_net_getsize6(SVCXPRT *xprt)
1440 {
1441 	int e, index;
1442 	rdc_k_info_t *krdc;
1443 	uint64_t ret;
1444 
1445 	/*
1446 	 * small change in semantics here, as we can't return
1447 	 * -1 over the wire anymore.
1448 	 */
1449 	ret = 0;
1450 
1451 	e = SVC_GETARGS(xprt, xdr_int, (char *)&index);
1452 	if (e) {
1453 		krdc = &rdc_k_info[index];
1454 		if (IS_VALID_INDEX(index) && ((krdc->type_flag &
1455 		    RDC_DISABLEPEND) == 0))
1456 			ret = mirror_getsize(index);
1457 	}
1458 	(void) svc_sendreply(xprt, xdr_u_longlong_t, (char *)&ret);
1459 }
1460 
1461 
1462 /*
1463  * r_net_state4
1464  */
1465 static void
1466 r_net_state4(SVCXPRT *xprt)
1467 {
1468 	rdc_u_info_t *urdc;
1469 	rdc_k_info_t *krdc;
1470 	struct set_state4 state;
1471 	rdc_set_t rdc_set;
1472 	int e, index = -1;
1473 	int options;
1474 	int log = 0;
1475 	int done = 0;
1476 	int slave = 0;
1477 	int rev_sync = 0;
1478 
1479 	e = SVC_GETARGS(xprt, xdr_set_state4, (char *)&state);
1480 	if (e) {
1481 		init_rdc_netbuf(&(rdc_set.primary.addr));
1482 		init_rdc_netbuf(&(rdc_set.secondary.addr));
1483 		bcopy(state.netaddr, rdc_set.primary.addr.buf,
1484 		    state.netaddrlen);
1485 		bcopy(state.rnetaddr, rdc_set.secondary.addr.buf,
1486 		    state.rnetaddrlen);
1487 		rdc_set.primary.addr.len = state.netaddrlen;
1488 		rdc_set.secondary.addr.len = state.rnetaddrlen;
1489 		(void) strncpy(rdc_set.primary.file, state.pfile,
1490 		    RDC_MAXNAMLEN);
1491 		(void) strncpy(rdc_set.secondary.file, state.sfile,
1492 		    RDC_MAXNAMLEN);
1493 		options = state.flag;
1494 		index = rdc_lookup_byaddr(&rdc_set);
1495 
1496 		krdc = &rdc_k_info[index];
1497 
1498 		if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1499 #ifdef DEBUG
1500 			cmn_err(CE_WARN,
1501 			    "!r_net_state: no index or disable pending");
1502 #endif
1503 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1504 			return;
1505 		}
1506 
1507 		urdc = &rdc_u_info[index];
1508 
1509 		if (!IS_ENABLED(urdc)) {
1510 			index = -1;
1511 #ifdef DEBUG
1512 			cmn_err(CE_WARN, "!r_net_state: set not enabled ");
1513 #endif
1514 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1515 			return;
1516 		}
1517 
1518 		if (krdc->lsrv == NULL) {
1519 			cmn_err(CE_NOTE, "!r_net_state: no valid svp\n");
1520 			index = -1;
1521 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1522 			return;
1523 		}
1524 		if (!krdc || !krdc->group) {
1525 #ifdef DEBUG
1526 			cmn_err(CE_NOTE,
1527 			    "!r_net_state: no valid krdc %p\n", (void*)krdc);
1528 #endif
1529 			index = -1;
1530 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1531 			return;
1532 		}
1533 
1534 		mutex_enter(&rdc_conf_lock);
1535 		if (krdc->type_flag & RDC_DISABLEPEND) {
1536 			mutex_exit(&rdc_conf_lock);
1537 			index = -1;
1538 #ifdef DEBUG
1539 			cmn_err(CE_WARN, "!r_net_state: disable pending");
1540 #endif
1541 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1542 			return;
1543 		}
1544 		set_busy(krdc);
1545 		mutex_exit(&rdc_conf_lock);
1546 
1547 		rdc_group_enter(krdc);
1548 
1549 		if (rdc_get_vflags(urdc) & RDC_PRIMARY)
1550 			krdc->intf = rdc_add_to_if(krdc->lsrv,
1551 			    &(urdc->primary.addr), &(urdc->secondary.addr), 1);
1552 		else
1553 			krdc->intf = rdc_add_to_if(krdc->lsrv,
1554 			    &(urdc->secondary.addr), &(urdc->primary.addr), 0);
1555 
1556 		if (options & CCIO_SLAVE) {
1557 			/*
1558 			 * mark that the bitmap needs clearing.
1559 			 */
1560 			rdc_many_enter(krdc);
1561 			rdc_set_flags(urdc, RDC_CLR_AFTERSYNC);
1562 			rdc_many_exit(krdc);
1563 
1564 			/* Starting forward sync */
1565 			if (urdc->volume_size == 0)
1566 				rdc_get_details(krdc);
1567 			if (urdc->volume_size == 0) {
1568 				index = -1;
1569 				goto out;
1570 			}
1571 			if (krdc->dcio_bitmap == NULL) {
1572 				if (rdc_resume_bitmap(krdc) < 0) {
1573 					index = -1;
1574 					goto out;
1575 				}
1576 			}
1577 			if (rdc_allow_sec_sync(urdc, CCIO_SLAVE) < 0) {
1578 				index = -1;
1579 				goto out;
1580 			}
1581 			rdc_dump_dsets(index);
1582 			slave = 1;
1583 		} else if (options & CCIO_RSYNC) {
1584 			/*
1585 			 * mark that the bitmap needs clearing.
1586 			 */
1587 			rdc_many_enter(krdc);
1588 			rdc_set_flags(urdc, RDC_CLR_AFTERSYNC);
1589 			rdc_many_exit(krdc);
1590 
1591 			/* Starting reverse sync */
1592 			if (rdc_get_vflags(urdc) & (RDC_SYNC_NEEDED |
1593 			    RDC_VOL_FAILED | RDC_BMP_FAILED)) {
1594 				index = -1;
1595 				goto out;
1596 			}
1597 			if (rdc_allow_sec_sync(urdc, CCIO_RSYNC) < 0) {
1598 				index = -1;
1599 				goto out;
1600 			}
1601 			rdc_dump_dsets(index);
1602 			rev_sync = 1;
1603 		} else if (options & CCIO_DONE) {
1604 			/* Sync completed OK */
1605 			if (rdc_get_vflags(urdc) & RDC_SYNC_NEEDED)
1606 				done = 1;	/* forward sync complete */
1607 			rdc_many_enter(krdc);
1608 			rdc_clr_flags(urdc, RDC_SYNCING | RDC_SYNC_NEEDED);
1609 			rdc_clr_mflags(urdc, RDC_SLAVE | RDC_RSYNC_NEEDED);
1610 			rdc_many_exit(krdc);
1611 			rdc_write_state(urdc);
1612 			if (rdc_get_vflags(urdc) & RDC_CLR_AFTERSYNC) {
1613 				RDC_ZERO_BITMAP(krdc);
1614 				rdc_many_enter(krdc);
1615 				rdc_clr_flags(urdc, RDC_CLR_AFTERSYNC);
1616 				rdc_many_exit(krdc);
1617 			}
1618 		} else if (options & CCIO_ENABLELOG) {
1619 			/* Sync aborted or logging started */
1620 			if (!(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
1621 				rdc_clr_flags(urdc, RDC_SYNCING);
1622 				rdc_many_enter(krdc);
1623 				rdc_clr_mflags(urdc, RDC_SLAVE);
1624 				rdc_many_exit(krdc);
1625 			}
1626 			log = 1;
1627 		}
1628 out:
1629 		rdc_group_exit(krdc);
1630 		free_rdc_netbuf(&(rdc_set.primary.addr));
1631 		free_rdc_netbuf(&(rdc_set.secondary.addr));
1632 
1633 		if (slave) {
1634 			if (_rdc_sync_event_notify(RDC_SYNC_START,
1635 			    urdc->secondary.file, urdc->group_name) >= 0) {
1636 				rdc_group_enter(krdc);
1637 				rdc_clr_flags(urdc, RDC_LOGGING);
1638 				rdc_many_enter(krdc);
1639 				rdc_clr_flags(urdc, RDC_VOL_FAILED);
1640 				rdc_set_flags(urdc,
1641 				    RDC_SYNCING | RDC_SYNC_NEEDED);
1642 				rdc_set_mflags(urdc, RDC_SLAVE);
1643 				rdc_many_exit(krdc);
1644 				rdc_write_state(urdc);
1645 				rdc_group_exit(krdc);
1646 			} else {
1647 				index = -1;
1648 			}
1649 		} else if (rev_sync) {
1650 			/* Check to see if volume is mounted */
1651 			if (_rdc_sync_event_notify(RDC_RSYNC_START,
1652 			    urdc->secondary.file, urdc->group_name) >= 0) {
1653 				rdc_group_enter(krdc);
1654 				rdc_clr_flags(urdc, RDC_LOGGING);
1655 				rdc_set_flags(urdc, RDC_SYNCING);
1656 				rdc_write_state(urdc);
1657 				rdc_group_exit(krdc);
1658 			} else {
1659 				index = -1;
1660 			}
1661 		} else if (done) {
1662 
1663 			/*
1664 			 * special case...
1665 			 * if this set is in a group, then sndrsyncd will
1666 			 * make sure that all sets in the group are REP
1667 			 * before updating the config to "update", telling
1668 			 * sndrsyncd that it is ok to take anther snapshot
1669 			 * on a following sync. The important part about
1670 			 * the whole thing is that syncd needs kernel stats.
1671 			 * however, this thread must set the set busy to
1672 			 * avoid disables. since this is the only
1673 			 * sync_event_notify() that will cause a status
1674 			 * call back into the kernel, and we will not be
1675 			 * accessing the group structure, we have to wakeup now
1676 			 */
1677 
1678 			mutex_enter(&rdc_conf_lock);
1679 			wakeup_busy(krdc);
1680 			mutex_exit(&rdc_conf_lock);
1681 
1682 			(void) _rdc_sync_event_notify(RDC_SYNC_DONE,
1683 			    urdc->secondary.file, urdc->group_name);
1684 		}
1685 	}
1686 
1687 	if (!done) {
1688 		mutex_enter(&rdc_conf_lock);
1689 		wakeup_busy(krdc);
1690 		mutex_exit(&rdc_conf_lock);
1691 	}
1692 
1693 	(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1694 	if (log) {
1695 		rdc_group_enter(krdc);
1696 		rdc_group_log(krdc, RDC_NOFLUSH | RDC_OTHERREMOTE,
1697 		    "Sync aborted or logging started");
1698 		rdc_group_exit(krdc);
1699 	}
1700 }
1701 
1702 
1703 /*
1704  * r_net_state
1705  */
1706 static void
1707 r_net_state(SVCXPRT *xprt)
1708 {
1709 	rdc_u_info_t *urdc;
1710 	rdc_k_info_t *krdc;
1711 	struct set_state state;
1712 	rdc_set_t rdc_set;
1713 	int e, index = -1;
1714 	int options;
1715 	int log = 0;
1716 	int done = 0;
1717 	int slave = 0;
1718 	int rev_sync = 0;
1719 	unsigned short *sp;
1720 
1721 	bzero(&state, sizeof (struct set_state));
1722 	e = SVC_GETARGS(xprt, xdr_set_state, (char *)&state);
1723 	if (e) {
1724 		init_rdc_netbuf(&(rdc_set.primary.addr));
1725 		init_rdc_netbuf(&(rdc_set.secondary.addr));
1726 		sp = (unsigned short *)(state.netaddr.buf);
1727 		*sp = ntohs(*sp);
1728 		bcopy(state.netaddr.buf, rdc_set.primary.addr.buf,
1729 		    state.netaddrlen);
1730 		sp = (unsigned short *)(state.rnetaddr.buf);
1731 		*sp = ntohs(*sp);
1732 		bcopy(state.rnetaddr.buf, rdc_set.secondary.addr.buf,
1733 		    state.rnetaddrlen);
1734 		rdc_set.primary.addr.len = state.netaddrlen;
1735 		rdc_set.secondary.addr.len = state.rnetaddrlen;
1736 		(void) strncpy(rdc_set.primary.file, state.pfile,
1737 		    RDC_MAXNAMLEN);
1738 		(void) strncpy(rdc_set.secondary.file, state.sfile,
1739 		    RDC_MAXNAMLEN);
1740 		options = state.flag;
1741 		index = rdc_lookup_byaddr(&rdc_set);
1742 
1743 		krdc = &rdc_k_info[index];
1744 
1745 		if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1746 #ifdef DEBUG
1747 			cmn_err(CE_WARN,
1748 			    "!r_net_state: no index or disable pending");
1749 #endif
1750 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1751 			return;
1752 		}
1753 
1754 		urdc = &rdc_u_info[index];
1755 
1756 		if (!IS_ENABLED(urdc)) {
1757 			index = -1;
1758 #ifdef DEBUG
1759 			cmn_err(CE_WARN, "!r_net_state: set not enabled ");
1760 #endif
1761 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1762 			return;
1763 		}
1764 
1765 		if (krdc->lsrv == NULL) {
1766 			cmn_err(CE_NOTE, "!r_net_state: no valid svp\n");
1767 			index = -1;
1768 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1769 			return;
1770 		}
1771 		if (!krdc || !krdc->group) {
1772 #ifdef DEBUG
1773 			cmn_err(CE_NOTE,
1774 			    "!r_net_state: no valid krdc %p\n", (void*)krdc);
1775 #endif
1776 			index = -1;
1777 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1778 			return;
1779 		}
1780 
1781 		mutex_enter(&rdc_conf_lock);
1782 		if (krdc->type_flag & RDC_DISABLEPEND) {
1783 			mutex_exit(&rdc_conf_lock);
1784 			index = -1;
1785 #ifdef DEBUG
1786 			cmn_err(CE_WARN, "!r_net_state: disable pending");
1787 #endif
1788 			(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1789 			return;
1790 		}
1791 		set_busy(krdc);
1792 		mutex_exit(&rdc_conf_lock);
1793 
1794 		rdc_group_enter(krdc);
1795 
1796 		if (rdc_get_vflags(urdc) & RDC_PRIMARY)
1797 			krdc->intf = rdc_add_to_if(krdc->lsrv,
1798 			    &(urdc->primary.addr), &(urdc->secondary.addr), 1);
1799 		else
1800 			krdc->intf = rdc_add_to_if(krdc->lsrv,
1801 			    &(urdc->secondary.addr), &(urdc->primary.addr), 0);
1802 
1803 		if (options & CCIO_SLAVE) {
1804 			/*
1805 			 * mark that the bitmap needs clearing.
1806 			 */
1807 			rdc_many_enter(krdc);
1808 			rdc_set_flags(urdc, RDC_CLR_AFTERSYNC);
1809 			rdc_many_exit(krdc);
1810 
1811 			/* Starting forward sync */
1812 			if (urdc->volume_size == 0)
1813 				rdc_get_details(krdc);
1814 			if (urdc->volume_size == 0) {
1815 				index = -1;
1816 				goto out;
1817 			}
1818 			if (krdc->dcio_bitmap == NULL) {
1819 				if (rdc_resume_bitmap(krdc) < 0) {
1820 					index = -1;
1821 					goto out;
1822 				}
1823 			}
1824 			if (rdc_allow_sec_sync(urdc, CCIO_SLAVE) < 0) {
1825 				index = -1;
1826 				goto out;
1827 			}
1828 			rdc_dump_dsets(index);
1829 			slave = 1;
1830 		} else if (options & CCIO_RSYNC) {
1831 			/*
1832 			 * mark that the bitmap needs clearing.
1833 			 */
1834 			rdc_many_enter(krdc);
1835 			rdc_set_flags(urdc, RDC_CLR_AFTERSYNC);
1836 			rdc_many_exit(krdc);
1837 
1838 			/* Starting reverse sync */
1839 			if (rdc_get_vflags(urdc) & (RDC_SYNC_NEEDED |
1840 			    RDC_VOL_FAILED | RDC_BMP_FAILED)) {
1841 				index = -1;
1842 				goto out;
1843 			}
1844 			if (rdc_allow_sec_sync(urdc, CCIO_RSYNC) < 0) {
1845 				index = -1;
1846 				goto out;
1847 			}
1848 			rdc_dump_dsets(index);
1849 			rev_sync = 1;
1850 		} else if (options & CCIO_DONE) {
1851 			/* Sync completed OK */
1852 			if (rdc_get_vflags(urdc) & RDC_SYNC_NEEDED)
1853 				done = 1;	/* forward sync complete */
1854 			rdc_many_enter(krdc);
1855 			rdc_clr_flags(urdc, RDC_SYNCING | RDC_SYNC_NEEDED);
1856 			rdc_clr_mflags(urdc, RDC_SLAVE | RDC_RSYNC_NEEDED);
1857 			rdc_many_exit(krdc);
1858 			rdc_write_state(urdc);
1859 			if (rdc_get_vflags(urdc) & RDC_CLR_AFTERSYNC) {
1860 				RDC_ZERO_BITMAP(krdc);
1861 				rdc_many_enter(krdc);
1862 				rdc_clr_flags(urdc, RDC_CLR_AFTERSYNC);
1863 				rdc_many_exit(krdc);
1864 			}
1865 		} else if (options & CCIO_ENABLELOG) {
1866 			/* Sync aborted or logging started */
1867 			if (!(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
1868 				rdc_clr_flags(urdc, RDC_SYNCING);
1869 				rdc_many_enter(krdc);
1870 				rdc_clr_mflags(urdc, RDC_SLAVE);
1871 				rdc_many_exit(krdc);
1872 			}
1873 			log = 1;
1874 		}
1875 out:
1876 		rdc_group_exit(krdc);
1877 		free_rdc_netbuf(&(rdc_set.primary.addr));
1878 		free_rdc_netbuf(&(rdc_set.secondary.addr));
1879 
1880 		if (slave) {
1881 			if (_rdc_sync_event_notify(RDC_SYNC_START,
1882 			    urdc->secondary.file, urdc->group_name) >= 0) {
1883 				rdc_group_enter(krdc);
1884 				rdc_clr_flags(urdc, RDC_LOGGING);
1885 				rdc_many_enter(krdc);
1886 				rdc_clr_flags(urdc, RDC_VOL_FAILED);
1887 				rdc_set_flags(urdc,
1888 				    RDC_SYNCING | RDC_SYNC_NEEDED);
1889 				rdc_set_mflags(urdc, RDC_SLAVE);
1890 				rdc_many_exit(krdc);
1891 				rdc_write_state(urdc);
1892 				rdc_group_exit(krdc);
1893 			} else {
1894 				index = -1;
1895 			}
1896 		} else if (rev_sync) {
1897 			/* Check to see if volume is mounted */
1898 			if (_rdc_sync_event_notify(RDC_RSYNC_START,
1899 			    urdc->secondary.file, urdc->group_name) >= 0) {
1900 				rdc_group_enter(krdc);
1901 				rdc_clr_flags(urdc, RDC_LOGGING);
1902 				rdc_set_flags(urdc, RDC_SYNCING);
1903 				rdc_write_state(urdc);
1904 				rdc_group_exit(krdc);
1905 			} else {
1906 				index = -1;
1907 			}
1908 		} else if (done) {
1909 
1910 			/*
1911 			 * special case...
1912 			 * if this set is in a group, then sndrsyncd will
1913 			 * make sure that all sets in the group are REP
1914 			 * before updating the config to "update", telling
1915 			 * sndrsyncd that it is ok to take anther snapshot
1916 			 * on a following sync. The important part about
1917 			 * the whole thing is that syncd needs kernel stats.
1918 			 * however, this thread must set the set busy to
1919 			 * avoid disables. since this is the only
1920 			 * sync_event_notify() that will cause a status
1921 			 * call back into the kernel, and we will not be
1922 			 * accessing the group structure, we have to wakeup now
1923 			 */
1924 
1925 			mutex_enter(&rdc_conf_lock);
1926 			wakeup_busy(krdc);
1927 			mutex_exit(&rdc_conf_lock);
1928 
1929 			(void) _rdc_sync_event_notify(RDC_SYNC_DONE,
1930 			    urdc->secondary.file, urdc->group_name);
1931 		}
1932 	}
1933 
1934 	if (!done) {
1935 		mutex_enter(&rdc_conf_lock);
1936 		wakeup_busy(krdc);
1937 		mutex_exit(&rdc_conf_lock);
1938 	}
1939 
1940 	(void) svc_sendreply(xprt, xdr_int, (char *)&index);
1941 	if (log) {
1942 		rdc_group_enter(krdc);
1943 		rdc_group_log(krdc, RDC_NOFLUSH | RDC_OTHERREMOTE,
1944 		    "Sync aborted or logging started");
1945 		rdc_group_exit(krdc);
1946 	}
1947 	free_rdc_netbuf(&(state.netaddr));
1948 	free_rdc_netbuf(&(state.rnetaddr));
1949 }
1950 
1951 /*
1952  * r_net_getstate4
1953  * Return our state to client
1954  */
1955 static void
1956 r_net_getstate4(SVCXPRT *xprt, struct svc_req *req)
1957 {
1958 	int e, ret = -1, index = -1;
1959 	struct set_state4 state;
1960 	rdc_u_info_t *urdc;
1961 	rdc_set_t rdc_set;
1962 
1963 	bzero(&state, sizeof (struct set_state));
1964 	e = SVC_GETARGS(xprt, xdr_set_state4, (char *)&state);
1965 	if (e) {
1966 		init_rdc_netbuf(&(rdc_set.primary.addr));
1967 		init_rdc_netbuf(&(rdc_set.secondary.addr));
1968 		bcopy(state.netaddr, rdc_set.primary.addr.buf,
1969 		    state.netaddrlen);
1970 		bcopy(state.rnetaddr, rdc_set.secondary.addr.buf,
1971 		    state.rnetaddrlen);
1972 		rdc_set.primary.addr.len = state.netaddrlen;
1973 		rdc_set.secondary.addr.len = state.rnetaddrlen;
1974 		(void) strncpy(rdc_set.primary.file, state.pfile,
1975 		    RDC_MAXNAMLEN);
1976 		(void) strncpy(rdc_set.secondary.file, state.sfile,
1977 		    RDC_MAXNAMLEN);
1978 		index = rdc_lookup_byaddr(&rdc_set);
1979 		if (index >= 0) {
1980 			urdc = &rdc_u_info[index];
1981 
1982 			ret = 0;
1983 			if (rdc_get_vflags(urdc) & RDC_SYNCING)
1984 				ret |= 4;
1985 			if (rdc_get_vflags(urdc) & RDC_SLAVE)
1986 				ret |= 2;
1987 			if (rdc_get_vflags(urdc) & RDC_LOGGING)
1988 				ret |= 1;
1989 			rdc_set_if_vers(urdc, req->rq_vers);
1990 		}
1991 		free_rdc_netbuf(&(rdc_set.primary.addr));
1992 		free_rdc_netbuf(&(rdc_set.secondary.addr));
1993 	}
1994 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
1995 }
1996 
1997 /*
1998  * r_net_getstate7
1999  * Return our state to client
2000  */
2001 static void
2002 r_net_getstate7(SVCXPRT *xprt, struct svc_req *req)
2003 {
2004 	int e, ret = -1, index = -1;
2005 	struct set_state state;
2006 	char pstr[RDC_MAXNAMLEN];
2007 	char sstr[RDC_MAXNAMLEN];
2008 	rdc_u_info_t *urdc;
2009 	rdc_set_t rdc_set;
2010 	unsigned short *sp;
2011 
2012 	bzero(&state, sizeof (struct set_state));
2013 	state.pfile = pstr;
2014 	state.sfile = sstr;
2015 
2016 	e = SVC_GETARGS(xprt, xdr_set_state, (char *)&state);
2017 	if (e) {
2018 		init_rdc_netbuf(&(rdc_set.primary.addr));
2019 		init_rdc_netbuf(&(rdc_set.secondary.addr));
2020 		sp = (unsigned short *)(state.netaddr.buf);
2021 		*sp = ntohs(*sp);
2022 		bcopy(state.netaddr.buf, rdc_set.primary.addr.buf,
2023 		    state.netaddrlen);
2024 		sp = (unsigned short *)(state.rnetaddr.buf);
2025 		*sp = ntohs(*sp);
2026 		bcopy(state.rnetaddr.buf, rdc_set.secondary.addr.buf,
2027 		    state.rnetaddrlen);
2028 		rdc_set.primary.addr.len = state.netaddrlen;
2029 		rdc_set.secondary.addr.len = state.rnetaddrlen;
2030 		/*
2031 		 * strncpy(rdc_set.primary.file, state.pfile, RDC_MAXNAMLEN);
2032 		 * strncpy(rdc_set.secondary.file, state.sfile, RDC_MAXNAMLEN);
2033 		 */
2034 		bcopy(state.pfile, rdc_set.primary.file, RDC_MAXNAMLEN);
2035 		bcopy(state.sfile, rdc_set.secondary.file, RDC_MAXNAMLEN);
2036 		index = rdc_lookup_byaddr(&rdc_set);
2037 		if (index >= 0) {
2038 			urdc = &rdc_u_info[index];
2039 
2040 			ret = 0;
2041 			if (rdc_get_vflags(urdc) & RDC_SYNCING)
2042 				ret |= 4;
2043 			if (rdc_get_vflags(urdc) & RDC_SLAVE)
2044 				ret |= 2;
2045 			if (rdc_get_vflags(urdc) & RDC_LOGGING)
2046 				ret |= 1;
2047 			rdc_set_if_vers(urdc, req->rq_vers);
2048 		}
2049 		free_rdc_netbuf(&(rdc_set.primary.addr));
2050 		free_rdc_netbuf(&(rdc_set.secondary.addr));
2051 	}
2052 	(void) svc_sendreply(xprt, xdr_int, (char *)&ret);
2053 }
2054 
2055 /*
2056  * copy from/to a dset/vector combination to a network xdr buffer.
2057  */
2058 static int
2059 rdc_dsetcopy(rdc_net_dataset_t *dset, nsc_vec_t *invec, nsc_off_t fba_pos,
2060     nsc_size_t fba_len, char *bdata, int blen, int dir)
2061 {
2062 	nsc_vec_t *vec;
2063 	uchar_t *sv_addr;
2064 	uchar_t *data;
2065 	int sv_len;
2066 	nsc_off_t fpos;
2067 	int len;
2068 	int n;
2069 
2070 	if (!bdata || !dset || !invec) {
2071 #ifdef DEBUG
2072 		cmn_err(CE_NOTE,
2073 		    "!rdc: dsetcopy: parameters failed bdata %p, dset %p "
2074 		    "invec %p", (void *)bdata, (void *)dset, (void *)invec);
2075 #endif
2076 		return (FALSE);
2077 	}
2078 
2079 	if (fba_len > MAX_RDC_FBAS ||
2080 	    (dir != COPY_IN && dir != COPY_OUT)) {
2081 #ifdef DEBUG
2082 		cmn_err(CE_NOTE,
2083 		    "!rdc: dsetcopy: params failed fba_len %" NSC_SZFMT
2084 		    " fba_pos %" NSC_SZFMT ", dir %d", fba_len, fba_pos, dir);
2085 #endif
2086 		return (FALSE);
2087 	}
2088 
2089 	data = (uchar_t *)bdata;	/* pointer to data in rpc */
2090 	len = FBA_SIZE(fba_len);	/* length of this transfer in bytes */
2091 	fpos = fba_pos;			/* start fba offset within buffer */
2092 
2093 	if (!len) {
2094 #ifdef DEBUG
2095 		cmn_err(CE_NOTE, "!rdc: dsetcopy: len = 0");
2096 #endif
2097 		return (FALSE);
2098 	}
2099 
2100 	if (len != blen) {
2101 #ifdef DEBUG
2102 		cmn_err(CE_NOTE, "!rdc:dsetcopy: len %d != blen %d", len, blen);
2103 #endif
2104 		if (len > blen)
2105 			len = blen;
2106 	}
2107 
2108 	if (!RDC_DSET_LIMITS(dset, fba_pos, fba_len)) {
2109 		/* should never happen */
2110 #ifdef DEBUG
2111 		cmn_err(CE_NOTE,
2112 		    "!rdc: dsetcopy: handle limits pos %" NSC_SZFMT " (%"
2113 		    NSC_SZFMT ") len %" NSC_SZFMT " (%" NSC_SZFMT ")",
2114 		    fba_pos, dset->pos, fba_len, dset->fbalen);
2115 #endif
2116 		return (FALSE);	/* Don't overrun handle */
2117 	}
2118 
2119 	vec = invec;
2120 	fpos -= dset->pos;
2121 
2122 	/* find starting position in vector */
2123 
2124 	for (; fpos >= FBA_NUM(vec->sv_len); vec++)
2125 		fpos -= FBA_NUM(vec->sv_len);
2126 
2127 	/*
2128 	 * Copy data
2129 	 */
2130 
2131 	sv_addr = vec->sv_addr + FBA_SIZE(fpos);
2132 	sv_len = vec->sv_len - FBA_SIZE(fpos);
2133 
2134 	while (len) {
2135 		if (!sv_addr)	/* end of vec - how did this happen? */
2136 			break;
2137 
2138 		n = min(sv_len, len);
2139 
2140 		if (dir == COPY_OUT)
2141 			bcopy(data, sv_addr, (size_t)n);
2142 		else
2143 			bcopy(sv_addr, data, (size_t)n);
2144 
2145 		sv_len -= n;
2146 		len -= n;
2147 
2148 		sv_addr += n;
2149 		data += n;
2150 
2151 		if (sv_len <= 0) {
2152 			/* goto next vector */
2153 			vec++;
2154 			sv_addr = vec->sv_addr;
2155 			sv_len = vec->sv_len;
2156 		}
2157 	}
2158 
2159 	return (TRUE);
2160 }
2161 
2162 
2163 /*
2164  * rdc_start_server
2165  * Starts the kRPC server for rdc. Uses tli file descriptor passed down
2166  * from user level rdc server.
2167  *
2168  * Returns: 0 or errno (NOT unistat!).
2169  */
2170 int
2171 rdc_start_server(struct rdc_svc_args *args, int mode)
2172 {
2173 	file_t *fp;
2174 	int ret;
2175 	struct cred *cred;
2176 	STRUCT_HANDLE(rdc_svc_args, rs);
2177 
2178 	STRUCT_SET_HANDLE(rs, mode, args);
2179 	cred = ddi_get_cred();
2180 	if (drv_priv(cred) != 0)
2181 		return (EPERM);
2182 	fp = getf(STRUCT_FGET(rs, fd));
2183 	if (fp == NULL) {
2184 #ifdef DEBUG
2185 		cmn_err(CE_WARN, "!rdc_start_server fd %d, fp %p", args->fd,
2186 		    (void *) fp);
2187 #endif
2188 		return (EBADF);
2189 	}
2190 
2191 	ret = rdcsrv_load(fp, rdc_srvtab, args, mode);
2192 
2193 	releasef(STRUCT_FGET(rs, fd));
2194 	return (ret);
2195 }
2196 
2197 /*
2198  * Allocate a new sleepq element.
2199  */
2200 
2201 static rdc_sleepq_t *
2202 rdc_newsleepq()
2203 {
2204 	rdc_sleepq_t	*sq;
2205 
2206 	sq = kmem_alloc(sizeof (rdc_sleepq_t), KM_SLEEP);
2207 	sq->next = NULL;
2208 #ifdef DEBUG
2209 	mutex_enter(&rdc_cntlock);
2210 	rdc_sleepcnt++;
2211 	mutex_exit(&rdc_cntlock);
2212 #endif
2213 	return (sq);
2214 }
2215 
2216 /*
2217  * free memory/resources used by a sleepq element.
2218  */
2219 static void
2220 rdc_delsleepq(rdc_sleepq_t *sq)
2221 {
2222 	rdc_net_dataset_t *dset;
2223 
2224 	if (sq->idx != -1) {
2225 		dset = rdc_net_get_set(sq->sindex, sq->idx);
2226 		if (dset) {
2227 			rdc_net_del_set(sq->sindex, dset);
2228 		}
2229 	}
2230 	kmem_free(sq, sizeof (rdc_sleepq_t));
2231 #ifdef DEBUG
2232 	mutex_enter(&rdc_cntlock);
2233 	rdc_sleepcnt--;
2234 	mutex_exit(&rdc_cntlock);
2235 #endif
2236 }
2237 
2238 
2239 /*
2240  * skip down the sleep q and insert the sleep request
2241  * in ascending order. Return 0 on success, 1 on failure.
2242  */
2243 static int
2244 rdc_sleepq(rdc_group_t *group, rdc_sleepq_t *sq)
2245 {
2246 	rdc_sleepq_t *findsq;
2247 
2248 
2249 	ASSERT(MUTEX_HELD(&group->ra_queue.net_qlock));
2250 	if (group->sleepq == NULL) {
2251 		group->sleepq = sq;
2252 	} else {
2253 		if (sq->seq == group->sleepq->seq) {
2254 			cmn_err(CE_WARN, "!rdc_sleepq: Attempt to "
2255 			    "add duplicate request to queue %d", sq->seq);
2256 			return (1);
2257 		}
2258 		if (RDC_INFRONT(sq->seq, group->sleepq->seq)) {
2259 			sq->next = group->sleepq;
2260 			group->sleepq = sq;
2261 		} else {
2262 			findsq = group->sleepq;
2263 
2264 			while (findsq->next) {
2265 				if (sq->seq == findsq->next->seq) {
2266 					cmn_err(CE_WARN, "!rdc_sleepq: "
2267 					    "Attempt to add duplicate "
2268 					    "request to queue %d", sq->seq);
2269 					return (1);
2270 				}
2271 				if (RDC_INFRONT(sq->seq, findsq->next->seq)) {
2272 					sq->next = findsq->next;
2273 					findsq->next = sq;
2274 					break;
2275 				}
2276 				findsq = findsq->next;
2277 			}
2278 			if (findsq->next == NULL)
2279 				findsq->next = sq;
2280 		}
2281 	}
2282 	return (0);
2283 }
2284 
2285 /*
2286  * run down the sleep q and discard all the sleepq elements.
2287  */
2288 void
2289 rdc_sleepqdiscard(rdc_group_t *group)
2290 {
2291 	rdc_sleepq_t *sq;
2292 	rdc_k_info_t *krdc;
2293 
2294 	ASSERT(MUTEX_HELD(&group->ra_queue.net_qlock));
2295 	sq = group->sleepq;
2296 
2297 	while (sq) {
2298 		rdc_sleepq_t *dsq;
2299 
2300 		dsq = sq;
2301 		krdc = &rdc_k_info[dsq->sindex];
2302 		if (krdc->io_kstats) {
2303 			mutex_enter(krdc->io_kstats->ks_lock);
2304 			kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2305 			mutex_exit(krdc->io_kstats->ks_lock);
2306 		}
2307 		sq = sq->next;
2308 		rdc_delsleepq(dsq);
2309 	}
2310 	group->sleepq = NULL;
2311 }
2312 
2313 /*
2314  * split any write requests down to maxfba sized chunks.
2315  */
2316 /*ARGSUSED*/
2317 static int
2318 rdc_writemaxfba(rdc_k_info_t *krdc, rdc_u_info_t *urdc,
2319     rdc_net_dataset_t *dset, uint_t seq, int nocache)
2320 {
2321 	int len;
2322 	int ret;
2323 	nsc_vec_t vector[2];
2324 	nsc_buf_t *handle;
2325 	int reserved;
2326 	int rtype;
2327 	nsc_size_t mfba;
2328 	nsc_size_t wsize;
2329 	nsc_off_t pos;
2330 	int eintr_count;
2331 	unsigned char *daddr;
2332 	int kstat_len;
2333 
2334 	kstat_len = len = dset->fbalen;
2335 	ret = 0;
2336 	handle = NULL;
2337 	reserved = 0;
2338 	rtype = RDC_RAW;
2339 
2340 	ASSERT(dset->nitems == 1);
2341 
2342 	eintr_count = 0;
2343 	do {
2344 		ret = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
2345 		if (ret == EINTR) {
2346 			++eintr_count;
2347 			delay(2);
2348 		}
2349 	} while ((ret == EINTR) && (eintr_count < MAX_EINTR_COUNT));
2350 	if (ret != 0) {
2351 #ifdef DEBUG
2352 		cmn_err(CE_NOTE, "!rdc_writemaxfba: reserve devs "
2353 		    "failed %d", ret);
2354 #endif
2355 		goto out;
2356 
2357 	}
2358 	reserved = 1;
2359 	/*
2360 	 * Perhaps we should cache mfba.
2361 	 */
2362 	ret = nsc_maxfbas(RDC_U_FD(krdc), 0, &mfba);
2363 	if (ret != 0) {
2364 #ifdef DEBUG
2365 		cmn_err(CE_NOTE, "!rdc_writemaxfba: msc_maxfbas failed %d",
2366 		    ret);
2367 #endif
2368 		goto out;
2369 	}
2370 
2371 	ASSERT(urdc->volume_size != 0);
2372 	if (dset->pos + len > urdc->volume_size) {
2373 		/* should never happen */
2374 		/*
2375 		 * also need to trim down the vector
2376 		 * sizes.
2377 		 */
2378 		kstat_len = len = urdc->volume_size - dset->pos;
2379 		dset->head->len -= FBA_SIZE(len);
2380 		ASSERT(dset->head->len > 0);
2381 	}
2382 	daddr = dset->head->dptr;
2383 	pos = dset->pos;
2384 	vector[1].sv_addr = NULL;
2385 	vector[1].sv_len = 0;
2386 
2387 	while (len > 0) {
2388 		wsize = min((nsc_size_t)len, mfba);
2389 		vector[0].sv_addr = daddr;
2390 		vector[0].sv_len = FBA_SIZE(wsize);
2391 
2392 		if (handle) {
2393 			(void) nsc_free_buf(handle);
2394 			handle = NULL;
2395 		}
2396 		ret = nsc_alloc_buf(RDC_U_FD(krdc), pos, wsize,
2397 		    NSC_WRBUF|NSC_NODATA|nocache, &handle);
2398 		if (ret != 0) {
2399 #ifdef DEBUG
2400 			cmn_err(CE_NOTE, "!rdc_writemaxfba: "
2401 			    "nsc_alloc (d1) buf failed %d at "
2402 			    "pos %" NSC_SZFMT " len %" NSC_SZFMT,
2403 			    ret, pos, wsize);
2404 #endif
2405 			goto out;
2406 		}
2407 		handle->sb_vec = &vector[0];
2408 		ret = rdc_combywrite(krdc, handle);
2409 		if (ret != 0) {
2410 #ifdef DEBUG
2411 			cmn_err(CE_NOTE, "!rdc_writemaxfba: "
2412 			    "write failed (d1) %d offset %" NSC_SZFMT " "
2413 			    "length %" NSC_SZFMT, ret, pos, wsize);
2414 #endif
2415 			goto out;
2416 		}
2417 		pos += wsize;
2418 		len -= wsize;
2419 		daddr += FBA_SIZE(wsize);
2420 	}
2421 out:
2422 	if (!RDC_SUCCESS(ret)) {
2423 		if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
2424 			ASSERT(!(rdc_get_vflags(urdc) &
2425 			    RDC_PRIMARY));
2426 			rdc_many_enter(krdc);
2427 			rdc_set_flags(urdc, RDC_SYNC_NEEDED);
2428 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2429 			    "svc write failed");
2430 			rdc_many_exit(krdc);
2431 			rdc_write_state(urdc);
2432 		}
2433 	} else {
2434 		/* success */
2435 #ifdef	DEBUG
2436 		if (rdc_netwrite6) {
2437 			/*
2438 			 * This string is used in the ZatoIchi MASNDR
2439 			 * tests, if you change this, update the test.
2440 			 */
2441 			cmn_err(CE_NOTE, "!writemaxfba: Write "
2442 			    "sequence %u", seq);
2443 		}
2444 #endif
2445 		if (krdc->io_kstats) {
2446 			KSTAT_IO_PTR(krdc->io_kstats)->writes++;
2447 			KSTAT_IO_PTR(krdc->io_kstats)->nwritten +=
2448 			    FBA_SIZE(kstat_len);
2449 		}
2450 	}
2451 	if (handle)
2452 		(void) nsc_free_buf(handle);
2453 	if (reserved)
2454 		_rdc_rlse_devs(krdc, rtype);
2455 	return (ret);
2456 }
2457 
2458 static int
2459 rdc_combywrite(rdc_k_info_t *krdc, nsc_buf_t *handle)
2460 {
2461 	int rsync;
2462 	int ret;
2463 	int multiret;
2464 
2465 	rsync = -1;
2466 	ret = 0;
2467 	/* Handle multihop I/O even on error */
2468 	if (IS_MULTI(krdc)) {
2469 		rdc_k_info_t *ktmp;
2470 		rdc_u_info_t *utmp;
2471 
2472 		rdc_many_enter(krdc);
2473 		/*
2474 		 * Find a target primary that is enabled,
2475 		 * taking account of the fact that this
2476 		 * could be a multihop secondary
2477 		 * connected to a 1-to-many primary.
2478 		 */
2479 		ktmp = krdc->multi_next;
2480 		if (ktmp == NULL) {
2481 			rdc_many_exit(krdc);
2482 			goto multi_done;
2483 		}
2484 		utmp = &rdc_u_info[ktmp->index];
2485 		do {
2486 			if ((rdc_get_vflags(utmp) & RDC_PRIMARY)
2487 			    /* CSTYLED */
2488 			    && IS_ENABLED(utmp))
2489 				break;
2490 
2491 			ktmp = ktmp->many_next;
2492 			utmp = &rdc_u_info[ktmp->index];
2493 		} while (ktmp != krdc->multi_next);
2494 
2495 		if (!(rdc_get_vflags(utmp) & RDC_PRIMARY) ||
2496 		    !IS_ENABLED(utmp)) {
2497 			rdc_many_exit(krdc);
2498 			goto multi_done;
2499 		}
2500 
2501 		rdc_many_exit(krdc);
2502 		rsync = (rdc_get_mflags(utmp) & RDC_SLAVE);
2503 		if (!rsync) {
2504 			/* normal case - local io first */
2505 			ret = nsc_write(handle, handle->sb_pos, handle->sb_len,
2506 			    0);
2507 		}
2508 		multiret = _rdc_multi_write(handle, handle->sb_pos,
2509 		    handle->sb_len, 0, ktmp);
2510 		if (!RDC_SUCCESS(multiret)) {
2511 #ifdef DEBUG
2512 			cmn_err(CE_NOTE, "!combywrite: "
2513 			    "rdc_multi_write failed "
2514 			    "status %d ret %d",
2515 			    handle->sb_error, multiret);
2516 #endif
2517 			if (!(rdc_get_vflags(utmp) &
2518 			    RDC_VOL_FAILED)) {
2519 				rdc_many_enter(ktmp);
2520 				if (rdc_get_vflags(utmp) &
2521 				    RDC_PRIMARY) {
2522 					rdc_set_mflags(utmp,
2523 					    RDC_RSYNC_NEEDED);
2524 				} else {
2525 					rdc_set_flags(utmp,
2526 					    RDC_SYNC_NEEDED);
2527 				}
2528 				rdc_set_flags(utmp,
2529 				    RDC_VOL_FAILED);
2530 				rdc_many_exit(ktmp);
2531 				rdc_write_state(utmp);
2532 			}
2533 		}
2534 	}
2535 
2536 multi_done:
2537 	if (rsync != 0) {
2538 		/*
2539 		 * Either:
2540 		 * reverse sync in progress and so we
2541 		 * need to do the local io after the
2542 		 * (multihop) secondary io.
2543 		 * Or:
2544 		 * no multihop and this is the only io
2545 		 * required.
2546 		 */
2547 		ret = nsc_write(handle, handle->sb_pos, handle->sb_len, 0);
2548 
2549 	}
2550 	return (ret);
2551 }
2552 /*
2553  * set the pos and len values in the piggyback reply.
2554  */
2555 static void
2556 rdc_setbitind(int *pendcnt, net_pendvec_t *pvec, rdc_net_dataset_t *dset,
2557     uint_t seq, int pindex, int qpos)
2558 {
2559 	int pc;
2560 	ASSERT(*pendcnt < RDC_MAXPENDQ);
2561 
2562 	pc = *pendcnt;
2563 	pvec[pc].seq = seq;
2564 	pvec[pc].apos = dset->pos;
2565 	pvec[pc].qpos = qpos;
2566 	pvec[pc].alen = dset->fbalen;
2567 	pvec[pc].pindex = pindex;
2568 	*pendcnt = pc + 1;
2569 	DTRACE_PROBE1(pvec_reply, int, seq);
2570 }
2571 
2572 /*
2573  * Enters with group->ra_queue.net_qlock held.
2574  * Tries to construct the return status data for
2575  * all the pending requests in the sleepq that it can
2576  * satisfy.
2577  */
2578 static void
2579 rdc_dopending(rdc_group_t *group, netwriteres *netretp)
2580 {
2581 	int pendcnt;
2582 	net_pendvec_t *pendvec;
2583 	rdc_sleepq_t *sq;
2584 	int ret;
2585 	int pendsz;
2586 
2587 	ASSERT(MUTEX_HELD(&group->ra_queue.net_qlock));
2588 
2589 	pendcnt = 0;
2590 	pendsz = RDC_MAXPENDQ * sizeof (net_pendvec_t);
2591 	pendvec = kmem_alloc(pendsz, KM_SLEEP);
2592 
2593 	/*
2594 	 * now look at the Q of pending tasks, attempt
2595 	 * to write any that have been waiting for
2596 	 * me to complete my write, and piggyback
2597 	 * their results in my reply, by setiing pendcnt
2598 	 * to the number of extra requests sucessfully
2599 	 * processed.
2600 	 */
2601 	while (group->sleepq && group->sleepq->seq == group->seq) {
2602 		rdc_k_info_t		*krdc;
2603 		rdc_u_info_t		*urdc;
2604 		struct rdc_net_dataset	*dset;
2605 
2606 		sq = group->sleepq;
2607 		group->sleepq = sq->next;
2608 		mutex_exit(&group->ra_queue.net_qlock);
2609 
2610 		krdc = &rdc_k_info[sq->sindex];
2611 		urdc = &rdc_u_info[sq->sindex];
2612 		if (krdc->io_kstats) {
2613 			mutex_enter(krdc->io_kstats->ks_lock);
2614 			kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2615 			mutex_exit(krdc->io_kstats->ks_lock);
2616 		}
2617 
2618 		dset = rdc_net_get_set(sq->sindex, sq->idx);
2619 		if (dset == NULL) {
2620 #ifdef	DEBUG
2621 			cmn_err(CE_NOTE, "!pending: %s:%s rdc_net_get_set "
2622 			    "failed", urdc->secondary.intf,
2623 			    urdc->secondary.file);
2624 #endif
2625 			/*
2626 			 * as we failed to get the pointer, there
2627 			 * is no point expecting the cleanup
2628 			 * code in rdc_delsleepq() to get it
2629 			 * either.
2630 			 */
2631 			sq->idx = -1;
2632 			goto cleansq;
2633 		}
2634 		sq->idx = -1;	/* marked as cleaned up */
2635 
2636 		ret = rdc_writemaxfba(krdc, urdc, dset, sq->seq, sq->nocache);
2637 		if (RDC_SUCCESS(ret)) {
2638 			rdc_setbitind(&pendcnt, pendvec, dset,
2639 			    sq->seq, sq->pindex, sq->qpos);
2640 		} else {
2641 			cmn_err(CE_WARN, "!dopending: Write of pending "
2642 			    "asynchronous task failed, with "
2643 			    "sequence number %u for SNDR set %s:%s",
2644 			    sq->seq, urdc->secondary.intf,
2645 			    urdc->secondary.file);
2646 		}
2647 		rdc_net_del_set(sq->sindex, dset);
2648 cleansq:
2649 		mutex_enter(&group->ra_queue.net_qlock);
2650 		group->seq = sq->seq + 1;
2651 		if (group->seq < sq->seq)
2652 			group->seq = RDC_NEWSEQ + 1;
2653 		rdc_delsleepq(sq);
2654 	}
2655 	mutex_exit(&group->ra_queue.net_qlock);
2656 	if (pendcnt) {
2657 		int vecsz;
2658 #ifdef DEBUG
2659 		if (rdc_netwrite6) {
2660 			cmn_err(CE_NOTE, "!packing pend, count %d", pendcnt);
2661 		}
2662 #endif
2663 		vecsz = pendcnt * sizeof (net_pendvec_t);
2664 		netretp->vecdata.vecdata_val =
2665 		    kmem_alloc(vecsz, KM_SLEEP);
2666 		netretp->vecdata.vecdata_len = pendcnt;
2667 		bcopy(pendvec, netretp->vecdata.vecdata_val, vecsz);
2668 	}
2669 	kmem_free(pendvec, pendsz);
2670 	mutex_enter(&group->ra_queue.net_qlock);
2671 }
2672 
2673 /*
2674  * Take the dset and allocate and fill in the vector.
2675  */
2676 static nsc_vec_t *
2677 rdc_dset2vec(rdc_net_dataset_t *dset)
2678 {
2679 	nsc_vec_t *vecret;
2680 	int i;
2681 	rdc_net_dataitem_t *ditem;
2682 
2683 	ASSERT(dset->nitems > 0);
2684 	ASSERT(dset->head);
2685 	ASSERT(dset->tail);
2686 
2687 	vecret = kmem_alloc((dset->nitems + 1) * sizeof (nsc_vec_t),
2688 	    KM_NOSLEEP);
2689 	if (vecret == NULL) {
2690 		return (NULL);
2691 	}
2692 	RDC_DSMEMUSE((dset->nitems + 1) * sizeof (nsc_vec_t));
2693 	ditem = dset->head;
2694 	for (i = 0; i < dset->nitems; i++) {
2695 		ASSERT(ditem);
2696 		vecret[i].sv_addr = ditem->dptr;
2697 		vecret[i].sv_len = ditem->len;
2698 		ditem = ditem->next;
2699 	}
2700 	/*
2701 	 * Null terminate.
2702 	 */
2703 	vecret[i].sv_addr = NULL;
2704 	vecret[i].sv_len = 0;
2705 	/*
2706 	 * Check the list and count matches.
2707 	 */
2708 	ASSERT(ditem == NULL);
2709 	return (vecret);
2710 }
2711 
2712 /*
2713  * Split the local read into maxfba sized chunks.
2714  * Returns 0 on an error, or a valid idx on success.
2715  */
2716 static int
2717 rdc_readmaxfba(int cd, nsc_off_t pos, nsc_size_t fbalen, int nocache)
2718 {
2719 	int idx;
2720 	rdc_k_info_t *krdc;
2721 	rdc_u_info_t *urdc;
2722 	rdc_net_dataset_t *dset;
2723 	rdc_net_dataitem_t *ditem;
2724 	int rtype;
2725 	nsc_buf_t *handle;
2726 	nsc_vec_t veclist[2];
2727 	int ret;
2728 	int reserved;
2729 	nsc_size_t fbaleft;
2730 	nsc_size_t mfba;
2731 	nsc_off_t fba;
2732 	nsc_off_t spos;
2733 	int eintr_count;
2734 
2735 	handle = NULL;
2736 	idx = 0; /* error status */
2737 	dset = NULL;
2738 	ditem = NULL;
2739 	reserved = 0;
2740 	ret = 0;
2741 	mfba = 0;
2742 
2743 	rtype = RDC_RAW;
2744 	krdc = &rdc_k_info[cd];
2745 	urdc = &rdc_u_info[cd];
2746 
2747 	eintr_count = 0;
2748 	do {
2749 		ret = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
2750 		if (ret == EINTR) {
2751 			++eintr_count;
2752 			delay(2);
2753 		}
2754 	} while ((ret == EINTR) && (eintr_count < MAX_EINTR_COUNT));
2755 	if (ret != 0) {
2756 #ifdef DEBUG
2757 		cmn_err(CE_NOTE, "!readmaxfba: reserve failed on set %s:%s %d",
2758 		    urdc->secondary.intf, urdc->secondary.file,
2759 		    ret);
2760 #endif
2761 		goto out;
2762 	}
2763 	reserved = 1;
2764 	/*
2765 	 * create a dataset that we can hang all the buffers from.
2766 	 */
2767 	dset = rdc_net_add_set(cd);
2768 	if (dset == NULL) {
2769 #ifdef DEBUG
2770 		cmn_err(CE_NOTE, "!readmaxfba: Unable to allocate dset on set "
2771 		    "%s:%s", urdc->secondary.intf, urdc->secondary.file);
2772 #endif
2773 		goto out;
2774 	}
2775 	dset->pos = pos;
2776 	dset->fbalen = fbalen;
2777 	ret = nsc_maxfbas(RDC_U_FD(krdc), 0, &mfba);
2778 	if (ret != 0) {
2779 #ifdef DEBUG
2780 		cmn_err(CE_NOTE, "!readmaxfba: msc_maxfbas failed on set %s:%s "
2781 		    "%d", urdc->secondary.intf,	urdc->secondary.file, ret);
2782 #endif
2783 		goto out;
2784 	}
2785 	spos = pos;
2786 	fbaleft = fbalen;
2787 	veclist[1].sv_addr = NULL;
2788 	veclist[1].sv_len = 0;
2789 
2790 	while (fbaleft > 0) {
2791 		fba = min(mfba, fbaleft);
2792 		if (handle) {
2793 			(void) nsc_free_buf(handle);
2794 			handle = NULL;
2795 		}
2796 		ret = nsc_alloc_buf(RDC_U_FD(krdc), spos, fba,
2797 		    nocache|NSC_NODATA, &handle);
2798 		if (ret != 0) {
2799 #ifdef DEBUG
2800 			cmn_err(CE_NOTE, "!readmaxfba: alloc failed on set"
2801 			    "%s:%s %d", urdc->secondary.intf,
2802 			    urdc->secondary.file, ret);
2803 #endif
2804 			goto out;
2805 		}
2806 		ditem = kmem_alloc(sizeof (rdc_net_dataitem_t), KM_NOSLEEP);
2807 		if (ditem == NULL) {
2808 			goto out;
2809 		}
2810 		RDC_DSMEMUSE(sizeof (rdc_net_dataitem_t));
2811 		ditem->len = FBA_SIZE(fba);
2812 		ditem->mlen = ditem->len;
2813 		ditem->dptr = kmem_alloc(ditem->len, KM_SLEEP);
2814 		RDC_DSMEMUSE(ditem->len);
2815 		ditem->next = NULL;
2816 		/*
2817 		 * construct a vector list
2818 		 */
2819 		veclist[0].sv_addr = ditem->dptr;
2820 		veclist[0].sv_len = ditem->len;
2821 		handle->sb_vec = veclist;
2822 		ret = rdc_combyread(krdc, urdc, handle);
2823 		if (ret != 0) {
2824 			goto out;
2825 		}
2826 		/*
2827 		 * place on linked list.
2828 		 */
2829 		dset->nitems++;
2830 		if (dset->head == NULL) {
2831 			dset->head = ditem;
2832 			dset->tail = ditem;
2833 		} else {
2834 			dset->tail->next = ditem;
2835 			dset->tail = ditem;
2836 		}
2837 		/*
2838 		 * now its linked, clear this so its not freed twice.
2839 		 */
2840 		ditem = NULL;
2841 		fbaleft -= fba;
2842 		spos += fba;
2843 	}
2844 	/*
2845 	 * all the reads have worked, store the results.
2846 	 */
2847 	idx = dset->id;
2848 	rdc_net_put_set(cd, dset);
2849 	dset = NULL;
2850 out:
2851 	if (handle)
2852 		(void) nsc_free_buf(handle);
2853 	if (reserved)
2854 		_rdc_rlse_devs(krdc, rtype);
2855 	if (dset)
2856 		rdc_net_del_set(cd, dset);
2857 	if (ditem) {
2858 		kmem_free(ditem->dptr, ditem->mlen);
2859 		RDC_DSMEMUSE(-ditem->mlen);
2860 		kmem_free(ditem, sizeof (*ditem));
2861 		RDC_DSMEMUSE(-sizeof (*ditem));
2862 	}
2863 	return (idx);
2864 }
2865 
2866 
2867 /*
2868  * perform both a local read, and if multihop, a remote read.
2869  * return 0 on success, or errno on failure.
2870  */
2871 static int
2872 rdc_combyread(rdc_k_info_t *krdc, rdc_u_info_t *urdc, nsc_buf_t *handle)
2873 {
2874 	int ret;
2875 	rdc_k_info_t *ktmp;
2876 	rdc_u_info_t *utmp;
2877 
2878 	/*
2879 	 * read it.
2880 	 */
2881 	if (krdc->io_kstats) {
2882 		mutex_enter(krdc->io_kstats->ks_lock);
2883 		kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
2884 		mutex_exit(krdc->io_kstats->ks_lock);
2885 	}
2886 
2887 	ret = nsc_read(handle, handle->sb_pos, handle->sb_len, NSC_READ);
2888 
2889 	if (krdc->io_kstats) {
2890 		mutex_enter(krdc->io_kstats->ks_lock);
2891 		kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2892 		mutex_exit(krdc->io_kstats->ks_lock);
2893 	}
2894 
2895 	if (ret != 0) {
2896 #ifdef DEBUG
2897 		cmn_err(CE_NOTE, "!combyread: read failed on set %s:%s %d",
2898 		    urdc->secondary.intf, urdc->secondary.file, ret);
2899 #endif
2900 		if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
2901 			rdc_many_enter(krdc);
2902 			rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2903 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2904 			    "comby read failed");
2905 			rdc_many_exit(krdc);
2906 			rdc_write_state(urdc);
2907 		}
2908 		goto out;
2909 	}
2910 	if (IS_MULTI(krdc) && (ktmp = krdc->multi_next) &&
2911 	    (utmp = &rdc_u_info[ktmp->index]) &&
2912 	    IS_ENABLED(utmp) &&
2913 	    (rdc_get_mflags(utmp) & RDC_RSYNC_NEEDED)) {
2914 		ret = _rdc_remote_read(ktmp, handle, handle->sb_pos,
2915 		    handle->sb_len, NSC_READ);
2916 		/*
2917 		 * Set NSC_MIXED so
2918 		 * that the cache will throw away this
2919 		 * buffer when we free it since we have
2920 		 * combined data from multiple sources
2921 		 * into a single buffer.
2922 		 * Currently we don't use the cache for
2923 		 * data volumes, so comment this out.
2924 		 * handle->sb_flag |= NSC_MIXED;
2925 		 */
2926 		if (ret != 0) {
2927 #ifdef DEBUG
2928 			cmn_err(CE_NOTE, "!combyread: remote read failed on "
2929 			    "set %s:%s %d", utmp->secondary.intf,
2930 			    utmp->secondary.file, ret);
2931 #endif
2932 			goto out;
2933 		}
2934 	}
2935 	if (krdc->io_kstats) {
2936 		KSTAT_IO_PTR(krdc->io_kstats)->reads++;
2937 		KSTAT_IO_PTR(krdc->io_kstats)->nread +=
2938 		    FBA_SIZE(handle->sb_len);
2939 	}
2940 out:
2941 	return (ret);
2942 }
2943 
2944 
2945 /*
2946  * remove and free all the collected dsets for this set.
2947  */
2948 void
2949 rdc_dump_dsets(int index)
2950 {
2951 	rdc_k_info_t *krdc;
2952 	rdc_net_dataset_t *dset;
2953 
2954 	krdc = &rdc_k_info[index];
2955 tloop:
2956 	mutex_enter(&krdc->dc_sleep);
2957 	while ((dset = krdc->net_dataset) != NULL) {
2958 		if (dset->inuse) {
2959 			/*
2960 			 * for the dset to be in use, the
2961 			 * service routine r_net_write6() must
2962 			 * be active with it. It will free
2963 			 * it eventually.
2964 			 */
2965 			mutex_exit(&krdc->dc_sleep);
2966 			delay(5);
2967 			goto tloop;
2968 		}
2969 		/*
2970 		 * free it.
2971 		 */
2972 		rdc_net_free_set(krdc, dset);
2973 	}
2974 	mutex_exit(&krdc->dc_sleep);
2975 }
2976 
2977 #ifdef	DEBUG
2978 void
2979 rdc_stallzero(int flag)
2980 {
2981 	static int init = 0;
2982 	static kcondvar_t cv;
2983 	static kmutex_t mu;
2984 
2985 	if (init == 0) {
2986 		cv_init(&cv, NULL, CV_DRIVER, NULL);
2987 		mutex_init(&mu, NULL, MUTEX_DRIVER, NULL);
2988 		init = 1;
2989 	}
2990 
2991 	mutex_enter(&mu);
2992 	switch (flag) {
2993 	case 0:
2994 		rdc_stall0 = 0;
2995 		cv_signal(&cv);
2996 		break;
2997 	case 1:
2998 		rdc_stall0 = 1;
2999 		break;
3000 	case 2:
3001 		while (rdc_stall0 == 1)
3002 			cv_wait(&cv, &mu);
3003 		break;
3004 	default:
3005 		cmn_err(CE_PANIC, "Bad flag value passed to rdc_stallzero");
3006 		break;
3007 	}
3008 	mutex_exit(&mu);
3009 }
3010 #endif
3011 
3012 /*
3013  * RDC protocol version 5
3014  */
3015 static rdc_disptab_t rdc_disptab5[] =
3016 {
3017 	/* PROC			Idempotent */
3018 	{ r_net_null,		FALSE },
3019 	{ rdcsrv_noproc,	FALSE },
3020 	{ r_net_getsize,	FALSE },
3021 	{ rdcsrv_noproc,	FALSE },
3022 	{ r_net_write5,		TRUE },
3023 	{ r_net_read,		FALSE },
3024 	{ rdcsrv_noproc,	FALSE },
3025 	{ r_net_state4,		FALSE },
3026 	{ r_net_ping4,		FALSE },
3027 	{ r_net_bmap,		FALSE },
3028 	{ r_net_bdata,		FALSE },
3029 	{ rdcsrv_noproc,	FALSE },
3030 	{ r_net_getstate4,	FALSE }
3031 };
3032 
3033 /*
3034  * RDC protocol version 6
3035  */
3036 static rdc_disptab_t rdc_disptab6[] =
3037 {
3038 	/* PROC			Idempotent */
3039 	{ r_net_null,		FALSE },
3040 	{ rdcsrv_noproc,	FALSE },
3041 	{ r_net_getsize6,	FALSE },
3042 	{ rdcsrv_noproc,	FALSE },
3043 	{ r_net_write6,		TRUE },
3044 	{ r_net_read6,		FALSE },
3045 	{ rdcsrv_noproc,	FALSE },
3046 	{ r_net_state4,		FALSE },
3047 	{ r_net_ping4,		FALSE },
3048 	{ r_net_bmap6,		FALSE },
3049 	{ r_net_bdata6,		FALSE },
3050 	{ rdcsrv_noproc,	FALSE },
3051 	{ r_net_getstate4,	FALSE }
3052 };
3053 
3054 /*
3055  * RDC protocol version 7
3056  */
3057 static rdc_disptab_t rdc_disptab7[] =
3058 {
3059 	/* PROC			Idempotent */
3060 	{ r_net_null,		FALSE },
3061 	{ rdcsrv_noproc,	FALSE },
3062 	{ r_net_getsize6,	FALSE },
3063 	{ rdcsrv_noproc,	FALSE },
3064 	{ r_net_write6,		TRUE },
3065 	{ r_net_read6,		FALSE },
3066 	{ rdcsrv_noproc,	FALSE },
3067 	{ r_net_state,		FALSE },
3068 	{ r_net_ping7,		FALSE },
3069 	{ r_net_bmap6,		FALSE },
3070 	{ r_net_bdata6,		FALSE },
3071 	{ rdcsrv_noproc,	FALSE },
3072 	{ r_net_getstate7,	FALSE }
3073 };
3074 
3075 static rdcsrv_t rdc_srvtab[] = {
3076 	{ rdc_disptab5, sizeof (rdc_disptab5) / sizeof (*rdc_disptab5) },
3077 	{ rdc_disptab6, sizeof (rdc_disptab6) / sizeof (*rdc_disptab6) },
3078 	{ rdc_disptab7, sizeof (rdc_disptab7) / sizeof (*rdc_disptab7) }
3079 };
3080