xref: /titanic_51/usr/src/uts/common/avs/ns/rdc/rdc_io.c (revision 5c5f137104b2d56181283389fa902220f2023809)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/types.h>
27 #include <sys/ksynch.h>
28 #include <sys/cmn_err.h>
29 #include <sys/kmem.h>
30 #include <sys/conf.h>
31 #include <sys/errno.h>
32 #include <sys/sysmacros.h>
33 
34 #ifdef _SunOS_5_6
35 /*
36  * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we
37  * define enum_t here as it is all we need from rpc/types.h
38  * anyway and make it look like we included it. Yuck.
39  */
40 #define	_RPC_TYPES_H
41 typedef int enum_t;
42 #else
43 #ifndef DS_DDICT
44 #include <rpc/types.h>
45 #endif
46 #endif /* _SunOS_5_6 */
47 
48 #include <sys/ddi.h>
49 
50 #include <sys/nsc_thread.h>
51 #include <sys/nsctl/nsctl.h>
52 
53 #include <sys/sdt.h>		/* dtrace is S10 or later */
54 
55 #include "rdc_io.h"
56 #include "rdc_bitmap.h"
57 #include "rdc_update.h"
58 #include "rdc_ioctl.h"
59 #include "rdcsrv.h"
60 #include "rdc_diskq.h"
61 
62 #include <sys/unistat/spcs_s.h>
63 #include <sys/unistat/spcs_s_k.h>
64 #include <sys/unistat/spcs_errors.h>
65 
66 volatile int net_exit;
67 nsc_size_t MAX_RDC_FBAS;
68 
69 #ifdef DEBUG
70 int RDC_MAX_SYNC_THREADS = 8;
71 int rdc_maxthreads_last = 8;
72 #endif
73 
74 kmutex_t rdc_ping_lock;		/* Ping lock */
75 static kmutex_t net_blk_lock;
76 
77 /*
78  * rdc_conf_lock is used as a global device configuration lock.
79  * It is also used by enable/resume and disable/suspend code to ensure that
80  * the transition of an rdc set between configured and unconfigured is
81  * atomic.
82  *
83  * krdc->group->lock is used to protect state changes of a configured rdc
84  * set (e.g. changes to urdc->flags), such as enabled to disabled and vice
85  * versa.
86  *
87  * rdc_many_lock is also used to protect changes in group membership. A group
88  * linked list cannot change while this lock is held. The many list and the
89  * multi-hop list are both protected by rdc_many_lock.
90  */
91 kmutex_t rdc_conf_lock;
92 kmutex_t rdc_many_lock;			/* Many/multi-list lock */
93 
94 static kmutex_t rdc_net_hnd_id_lock;	/* Network handle id lock */
95 int rdc_debug = 0;
96 int rdc_debug_sleep = 0;
97 
98 static int rdc_net_hnd_id = 1;
99 
100 extern kmutex_t rdc_clnt_lock;
101 
102 static void rdc_ditemsfree(rdc_net_dataset_t *);
103 void rdc_clnt_destroy(void);
104 
105 rdc_k_info_t *rdc_k_info;
106 rdc_u_info_t *rdc_u_info;
107 
108 unsigned long rdc_async_timeout;
109 
110 nsc_size_t rdc_maxthres_queue = RDC_MAXTHRES_QUEUE;
111 int rdc_max_qitems = RDC_MAX_QITEMS;
112 int rdc_asyncthr = RDC_ASYNCTHR;
113 static nsc_svc_t *rdc_volume_update;
114 static int rdc_prealloc_handle = 1;
115 
116 extern int _rdc_rsrv_diskq(rdc_group_t *group);
117 extern void _rdc_rlse_diskq(rdc_group_t *group);
118 
119 /*
120  * Forward declare all statics that are used before defined
121  * to enforce parameter checking
122  *
123  * Some (if not all) of these could be removed if the code were reordered
124  */
125 
126 static void rdc_volume_update_svc(intptr_t);
127 static void halt_sync(rdc_k_info_t *krdc);
128 void rdc_kstat_create(int index);
129 void rdc_kstat_delete(int index);
130 static int rdc_checkforbitmap(int, nsc_off_t);
131 static int rdc_installbitmap(int, void *, int, nsc_off_t, int, int *, int);
132 static rdc_group_t *rdc_newgroup();
133 
134 int rdc_enable_diskq(rdc_k_info_t *krdc);
135 void rdc_close_diskq(rdc_group_t *group);
136 int rdc_suspend_diskq(rdc_k_info_t *krdc);
137 int rdc_resume_diskq(rdc_k_info_t *krdc);
138 void rdc_init_diskq_header(rdc_group_t *grp, dqheader *header);
139 void rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int dolog);
140 void rdc_unfail_diskq(rdc_k_info_t *krdc);
141 void rdc_unintercept_diskq(rdc_group_t *grp);
142 int rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int flags);
143 void rdc_qfiller_thr(rdc_k_info_t *krdc);
144 
145 nstset_t *_rdc_ioset;
146 nstset_t *_rdc_flset;
147 
148 /*
149  * RDC threadset tunables
150  */
151 int rdc_threads = 64;		/* default number of threads */
152 int rdc_threads_inc = 8;	/* increment for changing the size of the set */
153 
154 /*
155  * Private threadset manipulation variables
156  */
157 static int rdc_threads_hysteresis = 2;
158 				/* hysteresis for threadset resizing */
159 static int rdc_sets_active;	/* number of sets currently enabled */
160 
161 #ifdef DEBUG
162 kmutex_t rdc_cntlock;
163 #endif
164 
165 /*
166  * rdc_thread_deconfigure - rdc is being deconfigured, stop any
167  * thread activity.
168  *
169  * Inherently single-threaded by the Solaris module unloading code.
170  */
171 static void
172 rdc_thread_deconfigure(void)
173 {
174 	nst_destroy(_rdc_ioset);
175 	_rdc_ioset = NULL;
176 
177 	nst_destroy(_rdc_flset);
178 	_rdc_flset = NULL;
179 
180 	nst_destroy(sync_info.rdc_syncset);
181 	sync_info.rdc_syncset = NULL;
182 }
183 
184 /*
185  * rdc_thread_configure - rdc is being configured, initialize the
186  * threads we need for flushing aync volumes.
187  *
188  * Must be called with rdc_conf_lock held.
189  */
190 static int
191 rdc_thread_configure(void)
192 {
193 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
194 
195 	if ((_rdc_ioset = nst_init("rdc_thr", rdc_threads)) == NULL)
196 		return (EINVAL);
197 
198 	if ((_rdc_flset = nst_init("rdc_flushthr", 2)) == NULL)
199 		return (EINVAL);
200 
201 	if ((sync_info.rdc_syncset =
202 	    nst_init("rdc_syncthr", RDC_MAX_SYNC_THREADS)) == NULL)
203 		return (EINVAL);
204 
205 	return (0);
206 }
207 
208 
209 /*
210  * rdc_thread_tune - called to tune the size of the rdc threadset.
211  *
212  * Called from the config code when an rdc_set has been enabled or disabled.
213  * 'sets' is the increment to the number of active rdc_sets.
214  *
215  * Must be called with rdc_conf_lock held.
216  */
217 static void
218 rdc_thread_tune(int sets)
219 {
220 	int incr = (sets > 0) ? 1 : -1;
221 	int change = 0;
222 	int nthreads;
223 
224 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
225 
226 	if (sets < 0)
227 		sets = -sets;
228 
229 	while (sets--) {
230 		nthreads = nst_nthread(_rdc_ioset);
231 		rdc_sets_active += incr;
232 
233 		if (rdc_sets_active >= nthreads)
234 			change += nst_add_thread(_rdc_ioset, rdc_threads_inc);
235 		else if ((rdc_sets_active <
236 		    (nthreads - (rdc_threads_inc + rdc_threads_hysteresis))) &&
237 		    ((nthreads - rdc_threads_inc) >= rdc_threads))
238 			change -= nst_del_thread(_rdc_ioset, rdc_threads_inc);
239 	}
240 
241 #ifdef DEBUG
242 	if (change) {
243 		cmn_err(CE_NOTE, "!rdc_thread_tune: "
244 		    "nsets %d, nthreads %d, nthreads change %d",
245 		    rdc_sets_active, nst_nthread(_rdc_ioset), change);
246 	}
247 #endif
248 }
249 
250 
251 /*
252  * _rdc_unload() - cache is being unloaded,
253  * deallocate any dual copy structures allocated during cache
254  * loading.
255  */
256 void
257 _rdc_unload(void)
258 {
259 	int i;
260 	rdc_k_info_t *krdc;
261 
262 	if (rdc_volume_update) {
263 		(void) nsc_unregister_svc(rdc_volume_update);
264 		rdc_volume_update = NULL;
265 	}
266 
267 	rdc_thread_deconfigure();
268 
269 	if (rdc_k_info != NULL) {
270 		for (i = 0; i < rdc_max_sets; i++) {
271 			krdc = &rdc_k_info[i];
272 			mutex_destroy(&krdc->dc_sleep);
273 			mutex_destroy(&krdc->bmapmutex);
274 			mutex_destroy(&krdc->kstat_mutex);
275 			mutex_destroy(&krdc->bmp_kstat_mutex);
276 			mutex_destroy(&krdc->syncbitmutex);
277 			cv_destroy(&krdc->busycv);
278 			cv_destroy(&krdc->closingcv);
279 			cv_destroy(&krdc->haltcv);
280 			cv_destroy(&krdc->synccv);
281 		}
282 	}
283 
284 	mutex_destroy(&sync_info.lock);
285 	mutex_destroy(&rdc_ping_lock);
286 	mutex_destroy(&net_blk_lock);
287 	mutex_destroy(&rdc_conf_lock);
288 	mutex_destroy(&rdc_many_lock);
289 	mutex_destroy(&rdc_net_hnd_id_lock);
290 	mutex_destroy(&rdc_clnt_lock);
291 #ifdef DEBUG
292 	mutex_destroy(&rdc_cntlock);
293 #endif
294 	net_exit = ATM_EXIT;
295 
296 	if (rdc_k_info != NULL)
297 		kmem_free(rdc_k_info, sizeof (*rdc_k_info) * rdc_max_sets);
298 	if (rdc_u_info != NULL)
299 		kmem_free(rdc_u_info, sizeof (*rdc_u_info) * rdc_max_sets);
300 	rdc_k_info = NULL;
301 	rdc_u_info = NULL;
302 	rdc_max_sets = 0;
303 }
304 
305 
306 /*
307  * _rdc_load() - rdc is being loaded, Allocate anything
308  * that will be needed while the cache is loaded but doesn't really
309  * depend on configuration parameters.
310  *
311  */
312 int
313 _rdc_load(void)
314 {
315 	int i;
316 	rdc_k_info_t *krdc;
317 
318 	mutex_init(&rdc_ping_lock, NULL, MUTEX_DRIVER, NULL);
319 	mutex_init(&net_blk_lock, NULL, MUTEX_DRIVER, NULL);
320 	mutex_init(&rdc_conf_lock, NULL, MUTEX_DRIVER, NULL);
321 	mutex_init(&rdc_many_lock, NULL, MUTEX_DRIVER, NULL);
322 	mutex_init(&rdc_net_hnd_id_lock, NULL, MUTEX_DRIVER, NULL);
323 	mutex_init(&rdc_clnt_lock, NULL, MUTEX_DRIVER, NULL);
324 	mutex_init(&sync_info.lock, NULL, MUTEX_DRIVER, NULL);
325 
326 #ifdef DEBUG
327 	mutex_init(&rdc_cntlock, NULL, MUTEX_DRIVER, NULL);
328 #endif
329 
330 	if ((i = nsc_max_devices()) < rdc_max_sets)
331 		rdc_max_sets = i;
332 	/* following case for partial installs that may fail */
333 	if (!rdc_max_sets)
334 		rdc_max_sets = 1024;
335 
336 	rdc_k_info = kmem_zalloc(sizeof (*rdc_k_info) * rdc_max_sets, KM_SLEEP);
337 	if (!rdc_k_info)
338 		return (ENOMEM);
339 
340 	rdc_u_info = kmem_zalloc(sizeof (*rdc_u_info) * rdc_max_sets, KM_SLEEP);
341 	if (!rdc_u_info) {
342 		kmem_free(rdc_k_info, sizeof (*rdc_k_info) * rdc_max_sets);
343 		return (ENOMEM);
344 	}
345 
346 	net_exit = ATM_NONE;
347 	for (i = 0; i < rdc_max_sets; i++) {
348 		krdc = &rdc_k_info[i];
349 		bzero(krdc, sizeof (*krdc));
350 		krdc->index = i;
351 		mutex_init(&krdc->dc_sleep, NULL, MUTEX_DRIVER, NULL);
352 		mutex_init(&krdc->bmapmutex, NULL, MUTEX_DRIVER, NULL);
353 		mutex_init(&krdc->kstat_mutex, NULL, MUTEX_DRIVER, NULL);
354 		mutex_init(&krdc->bmp_kstat_mutex, NULL, MUTEX_DRIVER, NULL);
355 		mutex_init(&krdc->syncbitmutex, NULL, MUTEX_DRIVER, NULL);
356 		cv_init(&krdc->busycv, NULL, CV_DRIVER, NULL);
357 		cv_init(&krdc->closingcv, NULL, CV_DRIVER, NULL);
358 		cv_init(&krdc->haltcv, NULL, CV_DRIVER, NULL);
359 		cv_init(&krdc->synccv, NULL, CV_DRIVER, NULL);
360 	}
361 
362 	rdc_volume_update = nsc_register_svc("RDCVolumeUpdated",
363 	    rdc_volume_update_svc);
364 
365 	return (0);
366 }
367 
368 static void
369 rdc_u_init(rdc_u_info_t *urdc)
370 {
371 	const int index = (int)(urdc - &rdc_u_info[0]);
372 
373 	if (urdc->secondary.addr.maxlen)
374 		free_rdc_netbuf(&urdc->secondary.addr);
375 	if (urdc->primary.addr.maxlen)
376 		free_rdc_netbuf(&urdc->primary.addr);
377 
378 	bzero(urdc, sizeof (rdc_u_info_t));
379 
380 	urdc->index = index;
381 	urdc->maxqfbas = rdc_maxthres_queue;
382 	urdc->maxqitems = rdc_max_qitems;
383 	urdc->asyncthr = rdc_asyncthr;
384 }
385 
386 /*
387  * _rdc_configure() - cache is being configured.
388  *
389  * Initialize dual copy structures
390  */
391 int
392 _rdc_configure(void)
393 {
394 	int index;
395 	rdc_k_info_t *krdc;
396 
397 	for (index = 0; index < rdc_max_sets; index++) {
398 		krdc = &rdc_k_info[index];
399 
400 		krdc->remote_index = -1;
401 		krdc->dcio_bitmap = NULL;
402 		krdc->bitmap_ref = NULL;
403 		krdc->bitmap_size = 0;
404 		krdc->bitmap_write = 0;
405 		krdc->disk_status = 0;
406 		krdc->many_next = krdc;
407 
408 		rdc_u_init(&rdc_u_info[index]);
409 	}
410 
411 	rdc_async_timeout = 120 * HZ;   /* Seconds * HZ */
412 	MAX_RDC_FBAS = FBA_LEN(RDC_MAXDATA);
413 	if (net_exit != ATM_INIT) {
414 		net_exit = ATM_INIT;
415 		return (0);
416 	}
417 	return (0);
418 }
419 
420 /*
421  * _rdc_deconfigure - rdc is being deconfigured, shut down any
422  * dual copy operations and return to an unconfigured state.
423  */
424 void
425 _rdc_deconfigure(void)
426 {
427 	rdc_k_info_t *krdc;
428 	rdc_u_info_t *urdc;
429 	int index;
430 
431 	for (index = 0; index < rdc_max_sets; index++) {
432 		krdc = &rdc_k_info[index];
433 		urdc = &rdc_u_info[index];
434 
435 		krdc->remote_index = -1;
436 		krdc->dcio_bitmap = NULL;
437 		krdc->bitmap_ref = NULL;
438 		krdc->bitmap_size = 0;
439 		krdc->bitmap_write = 0;
440 		krdc->disk_status = 0;
441 		krdc->many_next = krdc;
442 
443 		if (urdc->primary.addr.maxlen)
444 			free_rdc_netbuf(&(urdc->primary.addr));
445 
446 		if (urdc->secondary.addr.maxlen)
447 			free_rdc_netbuf(&(urdc->secondary.addr));
448 
449 		bzero(urdc, sizeof (rdc_u_info_t));
450 		urdc->index = index;
451 	}
452 	net_exit = ATM_EXIT;
453 	rdc_clnt_destroy();
454 
455 }
456 
457 
458 /*
459  * Lock primitives, containing checks that lock ordering isn't broken
460  */
461 /*ARGSUSED*/
462 void
463 rdc_many_enter(rdc_k_info_t *krdc)
464 {
465 	ASSERT(!MUTEX_HELD(&krdc->bmapmutex));
466 
467 	mutex_enter(&rdc_many_lock);
468 }
469 
470 /* ARGSUSED */
471 void
472 rdc_many_exit(rdc_k_info_t *krdc)
473 {
474 	mutex_exit(&rdc_many_lock);
475 }
476 
477 void
478 rdc_group_enter(rdc_k_info_t *krdc)
479 {
480 	ASSERT(!MUTEX_HELD(&rdc_many_lock));
481 	ASSERT(!MUTEX_HELD(&rdc_conf_lock));
482 	ASSERT(!MUTEX_HELD(&krdc->bmapmutex));
483 
484 	mutex_enter(&krdc->group->lock);
485 }
486 
487 void
488 rdc_group_exit(rdc_k_info_t *krdc)
489 {
490 	mutex_exit(&krdc->group->lock);
491 }
492 
493 /*
494  * Suspend and disable operations use this function to wait until it is safe
495  * to do continue, without trashing data structures used by other ioctls.
496  */
497 static void
498 wait_busy(rdc_k_info_t *krdc)
499 {
500 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
501 
502 	while (krdc->busy_count > 0)
503 		cv_wait(&krdc->busycv, &rdc_conf_lock);
504 }
505 
506 
507 /*
508  * Other ioctls use this function to hold off disable and suspend.
509  */
510 void
511 set_busy(rdc_k_info_t *krdc)
512 {
513 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
514 
515 	wait_busy(krdc);
516 
517 	krdc->busy_count++;
518 }
519 
520 
521 /*
522  * Other ioctls use this function to allow disable and suspend to continue.
523  */
524 void
525 wakeup_busy(rdc_k_info_t *krdc)
526 {
527 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
528 
529 	if (krdc->busy_count <= 0)
530 		return;
531 
532 	krdc->busy_count--;
533 	cv_broadcast(&krdc->busycv);
534 }
535 
536 
537 /*
538  * Remove the rdc set from its group, and destroy the group if no longer in
539  * use.
540  */
541 static void
542 remove_from_group(rdc_k_info_t *krdc)
543 {
544 	rdc_k_info_t *p;
545 	rdc_group_t *group;
546 
547 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
548 
549 	rdc_many_enter(krdc);
550 	group = krdc->group;
551 
552 	group->count--;
553 
554 	/*
555 	 * lock queue while looking at thrnum
556 	 */
557 	mutex_enter(&group->ra_queue.net_qlock);
558 	if ((group->rdc_thrnum == 0) && (group->count == 0)) {
559 
560 		/*
561 		 * Assure the we've stopped and the flusher thread has not
562 		 * fallen back to sleep
563 		 */
564 		if (krdc->group->ra_queue.qfill_sleeping != RDC_QFILL_DEAD) {
565 			group->ra_queue.qfflags |= RDC_QFILLSTOP;
566 			while (krdc->group->ra_queue.qfflags & RDC_QFILLSTOP) {
567 				if (krdc->group->ra_queue.qfill_sleeping ==
568 				    RDC_QFILL_ASLEEP)
569 					cv_broadcast(&group->ra_queue.qfcv);
570 				mutex_exit(&group->ra_queue.net_qlock);
571 				delay(2);
572 				mutex_enter(&group->ra_queue.net_qlock);
573 			}
574 		}
575 		mutex_exit(&group->ra_queue.net_qlock);
576 
577 		mutex_enter(&group->diskqmutex);
578 		rdc_close_diskq(group);
579 		mutex_exit(&group->diskqmutex);
580 		rdc_delgroup(group);
581 		rdc_many_exit(krdc);
582 		krdc->group = NULL;
583 		return;
584 	}
585 	mutex_exit(&group->ra_queue.net_qlock);
586 	/*
587 	 * Always clear the group field.
588 	 * no, you need it set in rdc_flush_memq().
589 	 * to call rdc_group_log()
590 	 * krdc->group = NULL;
591 	 */
592 
593 	/* Take this rdc structure off the group list */
594 
595 	for (p = krdc->group_next; p->group_next != krdc; p = p->group_next)
596 	;
597 	p->group_next = krdc->group_next;
598 
599 	rdc_many_exit(krdc);
600 }
601 
602 
603 /*
604  * Add the rdc set to its group, setting up a new group if it's the first one.
605  */
606 static int
607 add_to_group(rdc_k_info_t *krdc, int options, int cmd)
608 {
609 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
610 	rdc_u_info_t *utmp;
611 	rdc_k_info_t *ktmp;
612 	int index;
613 	rdc_group_t *group;
614 	int rc = 0;
615 	nsthread_t *trc;
616 
617 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
618 
619 	/*
620 	 * Look for matching group name, primary host name and secondary
621 	 * host name.
622 	 */
623 
624 	rdc_many_enter(krdc);
625 	for (index = 0; index < rdc_max_sets; index++) {
626 		utmp = &rdc_u_info[index];
627 		ktmp = &rdc_k_info[index];
628 
629 		if (urdc->group_name[0] == 0)
630 			break;
631 
632 		if (!IS_CONFIGURED(ktmp))
633 			continue;
634 
635 		if (strncmp(utmp->group_name, urdc->group_name,
636 		    NSC_MAXPATH) != 0)
637 			continue;
638 		if (strncmp(utmp->primary.intf, urdc->primary.intf,
639 		    MAX_RDC_HOST_SIZE) != 0) {
640 			/* Same group name, different primary interface */
641 			rdc_many_exit(krdc);
642 			return (-1);
643 		}
644 		if (strncmp(utmp->secondary.intf, urdc->secondary.intf,
645 		    MAX_RDC_HOST_SIZE) != 0) {
646 			/* Same group name, different secondary interface */
647 			rdc_many_exit(krdc);
648 			return (-1);
649 		}
650 
651 		/* Group already exists, so add this set to the group */
652 
653 		if (((options & RDC_OPT_ASYNC) == 0) &&
654 		    ((ktmp->type_flag & RDC_ASYNCMODE) != 0)) {
655 			/* Must be same mode as existing group members */
656 			rdc_many_exit(krdc);
657 			return (-1);
658 		}
659 		if (((options & RDC_OPT_ASYNC) != 0) &&
660 		    ((ktmp->type_flag & RDC_ASYNCMODE) == 0)) {
661 			/* Must be same mode as existing group members */
662 			rdc_many_exit(krdc);
663 			return (-1);
664 		}
665 
666 		/* cannont reconfigure existing group into new queue this way */
667 		if ((cmd != RDC_CMD_RESUME) &&
668 		    !RDC_IS_DISKQ(ktmp->group) && urdc->disk_queue[0] != '\0') {
669 			rdc_many_exit(krdc);
670 			return (RDC_EQNOADD);
671 		}
672 
673 		ktmp->group->count++;
674 		krdc->group = ktmp->group;
675 		krdc->group_next = ktmp->group_next;
676 		ktmp->group_next = krdc;
677 
678 		urdc->autosync = utmp->autosync;	/* Same as rest */
679 
680 		(void) strncpy(urdc->disk_queue, utmp->disk_queue, NSC_MAXPATH);
681 
682 		rdc_many_exit(krdc);
683 		return (0);
684 	}
685 
686 	/* This must be a new group */
687 	group = rdc_newgroup();
688 	krdc->group = group;
689 	krdc->group_next = krdc;
690 	urdc->autosync = -1;	/* Unknown */
691 
692 	/*
693 	 * Tune the thread set by one for each thread created
694 	 */
695 	rdc_thread_tune(1);
696 
697 	trc = nst_create(_rdc_ioset, rdc_qfiller_thr, (void *)krdc, NST_SLEEP);
698 	if (trc == NULL) {
699 		rc = -1;
700 		cmn_err(CE_NOTE, "!unable to create queue filler daemon");
701 		goto fail;
702 	}
703 
704 	if (urdc->disk_queue[0] == '\0') {
705 		krdc->group->flags |= RDC_MEMQUE;
706 	} else {
707 		krdc->group->flags |= RDC_DISKQUE;
708 
709 		/* XXX check here for resume or enable and act accordingly */
710 
711 		if (cmd == RDC_CMD_RESUME) {
712 			rc = rdc_resume_diskq(krdc);
713 
714 		} else if (cmd == RDC_CMD_ENABLE) {
715 			rc = rdc_enable_diskq(krdc);
716 			if ((rc == RDC_EQNOADD) && (cmd != RDC_CMD_ENABLE)) {
717 				cmn_err(CE_WARN, "!disk queue %s enable failed,"
718 				    " enabling memory queue",
719 				    urdc->disk_queue);
720 				krdc->group->flags &= ~RDC_DISKQUE;
721 				krdc->group->flags |= RDC_MEMQUE;
722 				bzero(urdc->disk_queue, NSC_MAXPATH);
723 			}
724 		}
725 	}
726 fail:
727 	rdc_many_exit(krdc);
728 	return (rc);
729 }
730 
731 
732 /*
733  * Move the set to a new group if possible
734  */
735 static int
736 change_group(rdc_k_info_t *krdc, int options)
737 {
738 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
739 	rdc_u_info_t *utmp;
740 	rdc_k_info_t *ktmp;
741 	rdc_k_info_t *next;
742 	char tmpq[NSC_MAXPATH];
743 	int index;
744 	int rc = -1;
745 	rdc_group_t *group, *old_group;
746 	nsthread_t *trc;
747 
748 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
749 
750 	/*
751 	 * Look for matching group name, primary host name and secondary
752 	 * host name.
753 	 */
754 
755 	bzero(&tmpq, sizeof (tmpq));
756 	rdc_many_enter(krdc);
757 
758 	old_group = krdc->group;
759 	next = krdc->group_next;
760 
761 	if (RDC_IS_DISKQ(old_group)) { /* can't keep your own queue */
762 		(void) strncpy(tmpq, urdc->disk_queue, NSC_MAXPATH);
763 		bzero(urdc->disk_queue, sizeof (urdc->disk_queue));
764 	}
765 	for (index = 0; index < rdc_max_sets; index++) {
766 		utmp = &rdc_u_info[index];
767 		ktmp = &rdc_k_info[index];
768 
769 		if (ktmp == krdc)
770 			continue;
771 
772 		if (urdc->group_name[0] == 0)
773 			break;
774 
775 		if (!IS_CONFIGURED(ktmp))
776 			continue;
777 
778 		if (strncmp(utmp->group_name, urdc->group_name,
779 		    NSC_MAXPATH) != 0)
780 			continue;
781 		if (strncmp(utmp->primary.intf, urdc->primary.intf,
782 		    MAX_RDC_HOST_SIZE) != 0)
783 			goto bad;
784 		if (strncmp(utmp->secondary.intf, urdc->secondary.intf,
785 		    MAX_RDC_HOST_SIZE) != 0)
786 			goto bad;
787 
788 		/* Group already exists, so add this set to the group */
789 
790 		if (((options & RDC_OPT_ASYNC) == 0) &&
791 		    ((ktmp->type_flag & RDC_ASYNCMODE) != 0)) {
792 			/* Must be same mode as existing group members */
793 			goto bad;
794 		}
795 		if (((options & RDC_OPT_ASYNC) != 0) &&
796 		    ((ktmp->type_flag & RDC_ASYNCMODE) == 0)) {
797 			/* Must be same mode as existing group members */
798 			goto bad;
799 		}
800 
801 		ktmp->group->count++;
802 		krdc->group = ktmp->group;
803 		krdc->group_next = ktmp->group_next;
804 		ktmp->group_next = krdc;
805 		bzero(urdc->disk_queue, sizeof (urdc->disk_queue));
806 		(void) strncpy(urdc->disk_queue, utmp->disk_queue, NSC_MAXPATH);
807 
808 		goto good;
809 	}
810 
811 	/* This must be a new group */
812 	group = rdc_newgroup();
813 	krdc->group = group;
814 	krdc->group_next = krdc;
815 
816 	trc = nst_create(_rdc_ioset, rdc_qfiller_thr, (void *)krdc, NST_SLEEP);
817 	if (trc == NULL) {
818 		rc = -1;
819 		cmn_err(CE_NOTE, "!unable to create queue filler daemon");
820 		goto bad;
821 	}
822 
823 	if (urdc->disk_queue[0] == 0) {
824 		krdc->group->flags |= RDC_MEMQUE;
825 	} else {
826 		krdc->group->flags |= RDC_DISKQUE;
827 		if ((rc = rdc_enable_diskq(krdc)) < 0)
828 			goto bad;
829 	}
830 good:
831 	if (options & RDC_OPT_ASYNC) {
832 		krdc->type_flag |= RDC_ASYNCMODE;
833 		rdc_set_flags(urdc, RDC_ASYNC);
834 	} else {
835 		krdc->type_flag &= ~RDC_ASYNCMODE;
836 		rdc_clr_flags(urdc, RDC_ASYNC);
837 	}
838 
839 	old_group->count--;
840 	if (!old_group->rdc_writer && old_group->count == 0) {
841 		/* Group now empty, so destroy */
842 		if (RDC_IS_DISKQ(old_group)) {
843 			rdc_unintercept_diskq(old_group);
844 			mutex_enter(&old_group->diskqmutex);
845 			rdc_close_diskq(old_group);
846 			mutex_exit(&old_group->diskqmutex);
847 		}
848 
849 		mutex_enter(&old_group->ra_queue.net_qlock);
850 
851 		/*
852 		 * Assure the we've stopped and the flusher thread has not
853 		 * fallen back to sleep
854 		 */
855 		if (old_group->ra_queue.qfill_sleeping != RDC_QFILL_DEAD) {
856 			old_group->ra_queue.qfflags |= RDC_QFILLSTOP;
857 			while (old_group->ra_queue.qfflags & RDC_QFILLSTOP) {
858 				if (old_group->ra_queue.qfill_sleeping ==
859 				    RDC_QFILL_ASLEEP)
860 					cv_broadcast(&old_group->ra_queue.qfcv);
861 				mutex_exit(&old_group->ra_queue.net_qlock);
862 				delay(2);
863 				mutex_enter(&old_group->ra_queue.net_qlock);
864 			}
865 		}
866 		mutex_exit(&old_group->ra_queue.net_qlock);
867 
868 		rdc_delgroup(old_group);
869 		rdc_many_exit(krdc);
870 		return (0);
871 	}
872 
873 	/* Take this rdc structure off the old group list */
874 
875 	for (ktmp = next; ktmp->group_next != krdc; ktmp = ktmp->group_next)
876 	;
877 	ktmp->group_next = next;
878 
879 	rdc_many_exit(krdc);
880 	return (0);
881 
882 bad:
883 	/* Leave existing group status alone */
884 	(void) strncpy(urdc->disk_queue, tmpq, NSC_MAXPATH);
885 	rdc_many_exit(krdc);
886 	return (rc);
887 }
888 
889 
890 /*
891  * Set flags for an rdc set, setting the group flags as necessary.
892  */
893 void
894 rdc_set_flags(rdc_u_info_t *urdc, int flags)
895 {
896 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
897 	int vflags, sflags, bflags, ssflags;
898 
899 	DTRACE_PROBE2(rdc_set_flags, int, krdc->index, int, flags);
900 	vflags = flags & RDC_VFLAGS;
901 	sflags = flags & RDC_SFLAGS;
902 	bflags = flags & RDC_BFLAGS;
903 	ssflags = flags & RDC_SYNC_STATE_FLAGS;
904 
905 	if (vflags) {
906 		/* normal volume flags */
907 		ASSERT(MUTEX_HELD(&rdc_conf_lock) ||
908 		    MUTEX_HELD(&krdc->group->lock));
909 		if (ssflags)
910 			mutex_enter(&krdc->bmapmutex);
911 
912 		urdc->flags |= vflags;
913 
914 		if (ssflags)
915 			mutex_exit(&krdc->bmapmutex);
916 	}
917 
918 	if (sflags) {
919 		/* Sync state flags that are protected by a different lock */
920 		ASSERT(MUTEX_HELD(&rdc_many_lock));
921 		urdc->sync_flags |= sflags;
922 	}
923 
924 	if (bflags) {
925 		/* Bmap state flags that are protected by a different lock */
926 		ASSERT(MUTEX_HELD(&krdc->bmapmutex));
927 		urdc->bmap_flags |= bflags;
928 	}
929 
930 }
931 
932 
933 /*
934  * Clear flags for an rdc set, clearing the group flags as necessary.
935  */
936 void
937 rdc_clr_flags(rdc_u_info_t *urdc, int flags)
938 {
939 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
940 	int vflags, sflags, bflags;
941 
942 	DTRACE_PROBE2(rdc_clr_flags, int, krdc->index, int, flags);
943 	vflags = flags & RDC_VFLAGS;
944 	sflags = flags & RDC_SFLAGS;
945 	bflags = flags & RDC_BFLAGS;
946 
947 	if (vflags) {
948 		/* normal volume flags */
949 		ASSERT(MUTEX_HELD(&rdc_conf_lock) ||
950 		    MUTEX_HELD(&krdc->group->lock));
951 		urdc->flags &= ~vflags;
952 
953 	}
954 
955 	if (sflags) {
956 		/* Sync state flags that are protected by a different lock */
957 		ASSERT(MUTEX_HELD(&rdc_many_lock));
958 		urdc->sync_flags &= ~sflags;
959 	}
960 
961 	if (bflags) {
962 		/* Bmap state flags that are protected by a different lock */
963 		ASSERT(MUTEX_HELD(&krdc->bmapmutex));
964 		urdc->bmap_flags &= ~bflags;
965 	}
966 }
967 
968 
969 /*
970  * Get the flags for an rdc set.
971  */
972 int
973 rdc_get_vflags(rdc_u_info_t *urdc)
974 {
975 	return (urdc->flags | urdc->sync_flags | urdc->bmap_flags);
976 }
977 
978 
979 /*
980  * Initialise flags for an rdc set.
981  */
982 static void
983 rdc_init_flags(rdc_u_info_t *urdc)
984 {
985 	urdc->flags = 0;
986 	urdc->mflags = 0;
987 	urdc->sync_flags = 0;
988 	urdc->bmap_flags = 0;
989 }
990 
991 
992 /*
993  * Set flags for a many group.
994  */
995 void
996 rdc_set_mflags(rdc_u_info_t *urdc, int flags)
997 {
998 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
999 	rdc_k_info_t *this = krdc;
1000 
1001 	ASSERT(!(flags & ~RDC_MFLAGS));
1002 
1003 	if (flags == 0)
1004 		return;
1005 
1006 	ASSERT(MUTEX_HELD(&rdc_many_lock));
1007 
1008 	rdc_set_flags(urdc, flags);	/* set flags on local urdc */
1009 
1010 	urdc->mflags |= flags;
1011 	for (krdc = krdc->many_next; krdc != this; krdc = krdc->many_next) {
1012 		urdc = &rdc_u_info[krdc->index];
1013 		if (!IS_ENABLED(urdc))
1014 			continue;
1015 		urdc->mflags |= flags;
1016 	}
1017 }
1018 
1019 
1020 /*
1021  * Clear flags for a many group.
1022  */
1023 void
1024 rdc_clr_mflags(rdc_u_info_t *urdc, int flags)
1025 {
1026 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
1027 	rdc_k_info_t *this = krdc;
1028 	rdc_u_info_t *utmp;
1029 
1030 	ASSERT(!(flags & ~RDC_MFLAGS));
1031 
1032 	if (flags == 0)
1033 		return;
1034 
1035 	ASSERT(MUTEX_HELD(&rdc_many_lock));
1036 
1037 	rdc_clr_flags(urdc, flags);	/* clear flags on local urdc */
1038 
1039 	/*
1040 	 * We must maintain the mflags based on the set of flags for
1041 	 * all the urdc's that are chained up.
1042 	 */
1043 
1044 	/*
1045 	 * First look through all the urdc's and remove bits from
1046 	 * the 'flags' variable that are in use elsewhere.
1047 	 */
1048 
1049 	for (krdc = krdc->many_next; krdc != this; krdc = krdc->many_next) {
1050 		utmp = &rdc_u_info[krdc->index];
1051 		if (!IS_ENABLED(utmp))
1052 			continue;
1053 		flags &= ~(rdc_get_vflags(utmp) & RDC_MFLAGS);
1054 		if (flags == 0)
1055 			break;
1056 	}
1057 
1058 	/*
1059 	 * Now clear flags as necessary.
1060 	 */
1061 
1062 	if (flags != 0) {
1063 		urdc->mflags &= ~flags;
1064 		for (krdc = krdc->many_next; krdc != this;
1065 		    krdc = krdc->many_next) {
1066 			utmp = &rdc_u_info[krdc->index];
1067 			if (!IS_ENABLED(utmp))
1068 				continue;
1069 			utmp->mflags &= ~flags;
1070 		}
1071 	}
1072 }
1073 
1074 
1075 int
1076 rdc_get_mflags(rdc_u_info_t *urdc)
1077 {
1078 	return (urdc->mflags);
1079 }
1080 
1081 
1082 void
1083 rdc_set_flags_log(rdc_u_info_t *urdc, int flags, char *why)
1084 {
1085 	DTRACE_PROBE2(rdc_set_flags_log, int, urdc->index, int, flags);
1086 
1087 	rdc_set_flags(urdc, flags);
1088 
1089 	if (why == NULL)
1090 		return;
1091 
1092 	if (flags & RDC_LOGGING)
1093 		cmn_err(CE_NOTE, "!sndr: %s:%s entered logging mode: %s",
1094 		    urdc->secondary.intf, urdc->secondary.file, why);
1095 	if (flags & RDC_VOL_FAILED)
1096 		cmn_err(CE_NOTE, "!sndr: %s:%s volume failed: %s",
1097 		    urdc->secondary.intf, urdc->secondary.file, why);
1098 	if (flags & RDC_BMP_FAILED)
1099 		cmn_err(CE_NOTE, "!sndr: %s:%s bitmap failed: %s",
1100 		    urdc->secondary.intf, urdc->secondary.file, why);
1101 }
1102 /*
1103  * rdc_lor(source, dest, len)
1104  * logically OR memory pointed to by source and dest, copying result into dest.
1105  */
1106 void
1107 rdc_lor(const uchar_t *source, uchar_t *dest, int len)
1108 {
1109 	int i;
1110 
1111 	if (source == NULL)
1112 		return;
1113 
1114 	for (i = 0; i < len; i++)
1115 		*dest++ |= *source++;
1116 }
1117 
1118 
1119 static int
1120 check_filesize(int index, spcs_s_info_t kstatus)
1121 {
1122 	uint64_t remote_size;
1123 	char tmp1[16], tmp2[16];
1124 	rdc_u_info_t *urdc = &rdc_u_info[index];
1125 	int status;
1126 
1127 	status = rdc_net_getsize(index, &remote_size);
1128 	if (status) {
1129 		(void) spcs_s_inttostring(status, tmp1, sizeof (tmp1), 0);
1130 		spcs_s_add(kstatus, RDC_EGETSIZE, urdc->secondary.intf,
1131 		    urdc->secondary.file, tmp1);
1132 		(void) rdc_net_state(index, CCIO_ENABLELOG);
1133 		return (RDC_EGETSIZE);
1134 	}
1135 	if (remote_size < (unsigned long long)urdc->volume_size) {
1136 		(void) spcs_s_inttostring(
1137 		    urdc->volume_size, tmp1, sizeof (tmp1), 0);
1138 		/*
1139 		 * Cheat, and covert to int, until we have
1140 		 * spcs_s_unsignedlonginttostring().
1141 		 */
1142 		status = (int)remote_size;
1143 		(void) spcs_s_inttostring(status, tmp2, sizeof (tmp2), 0);
1144 		spcs_s_add(kstatus, RDC_ESIZE, urdc->primary.intf,
1145 		    urdc->primary.file, tmp1, urdc->secondary.intf,
1146 		    urdc->secondary.file, tmp2);
1147 		(void) rdc_net_state(index, CCIO_ENABLELOG);
1148 		return (RDC_ESIZE);
1149 	}
1150 	return (0);
1151 }
1152 
1153 
1154 static void
1155 rdc_volume_update_svc(intptr_t arg)
1156 {
1157 	rdc_update_t *update = (rdc_update_t *)arg;
1158 	rdc_k_info_t *krdc;
1159 	rdc_k_info_t *this;
1160 	rdc_u_info_t *urdc;
1161 	struct net_bdata6 bd;
1162 	int index;
1163 	int rc;
1164 
1165 #ifdef DEBUG_IIUPDATE
1166 	cmn_err(CE_NOTE, "!SNDR received update request for %s",
1167 	    update->volume);
1168 #endif
1169 
1170 	if ((update->protocol != RDC_SVC_ONRETURN) &&
1171 	    (update->protocol != RDC_SVC_VOL_ENABLED)) {
1172 		/* don't understand what the client intends to do */
1173 		update->denied = 1;
1174 		spcs_s_add(update->status, RDC_EVERSION);
1175 		return;
1176 	}
1177 
1178 	index = rdc_lookup_enabled(update->volume, 0);
1179 	if (index < 0)
1180 		return;
1181 
1182 	/*
1183 	 * warn II that this volume is in use by sndr so
1184 	 * II can validate the sizes of the master vs shadow
1185 	 * and avoid trouble later down the line with
1186 	 * size mis-matches between urdc->volume_size and
1187 	 * what is returned from nsc_partsize() which may
1188 	 * be the size of the master when replicating the shadow
1189 	 */
1190 	if (update->protocol == RDC_SVC_VOL_ENABLED) {
1191 		if (index >= 0)
1192 			update->denied = 1;
1193 		return;
1194 	}
1195 
1196 	krdc = &rdc_k_info[index];
1197 	urdc = &rdc_u_info[index];
1198 	this = krdc;
1199 
1200 	do {
1201 		if (!(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1202 #ifdef DEBUG_IIUPDATE
1203 		cmn_err(CE_NOTE, "!SNDR refused update request for %s",
1204 		    update->volume);
1205 #endif
1206 		update->denied = 1;
1207 		spcs_s_add(update->status, RDC_EMIRRORUP);
1208 		return;
1209 		}
1210 		/* 1->many - all must be logging */
1211 		if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
1212 			rdc_many_enter(krdc);
1213 			for (krdc = krdc->many_next; krdc != this;
1214 			    krdc = krdc->many_next) {
1215 				urdc = &rdc_u_info[krdc->index];
1216 				if (!IS_ENABLED(urdc))
1217 					continue;
1218 				break;
1219 			}
1220 			rdc_many_exit(krdc);
1221 		}
1222 	} while (krdc != this);
1223 
1224 #ifdef DEBUG_IIUPDATE
1225 	cmn_err(CE_NOTE, "!SNDR allowed update request for %s", update->volume);
1226 #endif
1227 	urdc = &rdc_u_info[krdc->index];
1228 	do {
1229 
1230 		bd.size = min(krdc->bitmap_size, (nsc_size_t)update->size);
1231 		bd.data.data_val = (char *)update->bitmap;
1232 		bd.offset = 0;
1233 		bd.cd = index;
1234 
1235 		if ((rc = RDC_OR_BITMAP(&bd)) != 0) {
1236 			update->denied = 1;
1237 			spcs_s_add(update->status, rc);
1238 			return;
1239 		}
1240 		urdc = &rdc_u_info[index];
1241 		urdc->bits_set = RDC_COUNT_BITMAP(krdc);
1242 		if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
1243 			rdc_many_enter(krdc);
1244 			for (krdc = krdc->many_next; krdc != this;
1245 			    krdc = krdc->many_next) {
1246 				index = krdc->index;
1247 				if (!IS_ENABLED(urdc))
1248 					continue;
1249 				break;
1250 			}
1251 			rdc_many_exit(krdc);
1252 		}
1253 	} while (krdc != this);
1254 
1255 
1256 	/* II (or something else) has updated us, so no need for a sync */
1257 	if (rdc_get_vflags(urdc) & (RDC_SYNC_NEEDED | RDC_RSYNC_NEEDED)) {
1258 		rdc_many_enter(krdc);
1259 		rdc_clr_flags(urdc, RDC_SYNC_NEEDED | RDC_RSYNC_NEEDED);
1260 		rdc_many_exit(krdc);
1261 	}
1262 
1263 	if (krdc->bitmap_write > 0)
1264 		(void) rdc_write_bitmap(krdc);
1265 }
1266 
1267 
1268 /*
1269  * rdc_check()
1270  *
1271  * Return 0 if the set is configured, enabled and the supplied
1272  * addressing information matches the in-kernel config, otherwise
1273  * return 1.
1274  */
1275 static int
1276 rdc_check(rdc_k_info_t *krdc, rdc_set_t *rdc_set)
1277 {
1278 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1279 
1280 	ASSERT(MUTEX_HELD(&krdc->group->lock));
1281 
1282 	if (!IS_ENABLED(urdc))
1283 		return (1);
1284 
1285 	if (strncmp(urdc->primary.file, rdc_set->primary.file,
1286 	    NSC_MAXPATH) != 0) {
1287 #ifdef DEBUG
1288 		cmn_err(CE_WARN, "!rdc_check: primary file mismatch %s vs %s",
1289 		    urdc->primary.file, rdc_set->primary.file);
1290 #endif
1291 		return (1);
1292 	}
1293 
1294 	if (rdc_set->primary.addr.len != 0 &&
1295 	    bcmp(urdc->primary.addr.buf, rdc_set->primary.addr.buf,
1296 	    urdc->primary.addr.len) != 0) {
1297 #ifdef DEBUG
1298 		cmn_err(CE_WARN, "!rdc_check: primary address mismatch for %s",
1299 		    urdc->primary.file);
1300 #endif
1301 		return (1);
1302 	}
1303 
1304 	if (strncmp(urdc->secondary.file, rdc_set->secondary.file,
1305 	    NSC_MAXPATH) != 0) {
1306 #ifdef DEBUG
1307 		cmn_err(CE_WARN, "!rdc_check: secondary file mismatch %s vs %s",
1308 		    urdc->secondary.file, rdc_set->secondary.file);
1309 #endif
1310 		return (1);
1311 	}
1312 
1313 	if (rdc_set->secondary.addr.len != 0 &&
1314 	    bcmp(urdc->secondary.addr.buf, rdc_set->secondary.addr.buf,
1315 	    urdc->secondary.addr.len) != 0) {
1316 #ifdef DEBUG
1317 		cmn_err(CE_WARN, "!rdc_check: secondary addr mismatch for %s",
1318 		    urdc->secondary.file);
1319 #endif
1320 		return (1);
1321 	}
1322 
1323 	return (0);
1324 }
1325 
1326 
1327 /*
1328  * Lookup enabled sets for a bitmap match
1329  */
1330 
1331 int
1332 rdc_lookup_bitmap(char *pathname)
1333 {
1334 	rdc_u_info_t *urdc;
1335 #ifdef DEBUG
1336 	rdc_k_info_t *krdc;
1337 #endif
1338 	int index;
1339 
1340 	for (index = 0; index < rdc_max_sets; index++) {
1341 		urdc = &rdc_u_info[index];
1342 #ifdef DEBUG
1343 		krdc = &rdc_k_info[index];
1344 #endif
1345 		ASSERT(krdc->index == index);
1346 		ASSERT(urdc->index == index);
1347 
1348 		if (!IS_ENABLED(urdc))
1349 			continue;
1350 
1351 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1352 			if (strncmp(pathname, urdc->primary.bitmap,
1353 			    NSC_MAXPATH) == 0)
1354 				return (index);
1355 		} else {
1356 			if (strncmp(pathname, urdc->secondary.bitmap,
1357 			    NSC_MAXPATH) == 0)
1358 				return (index);
1359 		}
1360 	}
1361 
1362 	return (-1);
1363 }
1364 
1365 
1366 /*
1367  * Translate a pathname to index into rdc_k_info[].
1368  * Returns first match that is enabled.
1369  */
1370 
1371 int
1372 rdc_lookup_enabled(char *pathname, int allow_disabling)
1373 {
1374 	rdc_u_info_t *urdc;
1375 	rdc_k_info_t *krdc;
1376 	int index;
1377 
1378 restart:
1379 	for (index = 0; index < rdc_max_sets; index++) {
1380 		urdc = &rdc_u_info[index];
1381 		krdc = &rdc_k_info[index];
1382 
1383 		ASSERT(krdc->index == index);
1384 		ASSERT(urdc->index == index);
1385 
1386 		if (!IS_ENABLED(urdc))
1387 			continue;
1388 
1389 		if (allow_disabling == 0 && krdc->type_flag & RDC_UNREGISTER)
1390 			continue;
1391 
1392 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1393 			if (strncmp(pathname, urdc->primary.file,
1394 			    NSC_MAXPATH) == 0)
1395 				return (index);
1396 		} else {
1397 			if (strncmp(pathname, urdc->secondary.file,
1398 			    NSC_MAXPATH) == 0)
1399 				return (index);
1400 		}
1401 	}
1402 
1403 	if (allow_disabling == 0) {
1404 		/* None found, or only a disabling one found, so try again */
1405 		allow_disabling = 1;
1406 		goto restart;
1407 	}
1408 
1409 	return (-1);
1410 }
1411 
1412 
1413 /*
1414  * Translate a pathname to index into rdc_k_info[].
1415  * Returns first match that is configured.
1416  *
1417  * Used by enable & resume code.
1418  * Must be called with rdc_conf_lock held.
1419  */
1420 
1421 int
1422 rdc_lookup_configured(char *pathname)
1423 {
1424 	rdc_u_info_t *urdc;
1425 	rdc_k_info_t *krdc;
1426 	int index;
1427 
1428 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1429 
1430 	for (index = 0; index < rdc_max_sets; index++) {
1431 		urdc = &rdc_u_info[index];
1432 		krdc = &rdc_k_info[index];
1433 
1434 		ASSERT(krdc->index == index);
1435 		ASSERT(urdc->index == index);
1436 
1437 		if (!IS_CONFIGURED(krdc))
1438 			continue;
1439 
1440 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1441 			if (strncmp(pathname, urdc->primary.file,
1442 			    NSC_MAXPATH) == 0)
1443 				return (index);
1444 		} else {
1445 			if (strncmp(pathname, urdc->secondary.file,
1446 			    NSC_MAXPATH) == 0)
1447 				return (index);
1448 		}
1449 	}
1450 
1451 	return (-1);
1452 }
1453 
1454 
1455 /*
1456  * Looks up a configured set with matching secondary interface:volume
1457  * to check for illegal many-to-one volume configs.  To be used during
1458  * enable and resume processing.
1459  *
1460  * Must be called with rdc_conf_lock held.
1461  */
1462 
1463 static int
1464 rdc_lookup_many2one(rdc_set_t *rdc_set)
1465 {
1466 	rdc_u_info_t *urdc;
1467 	rdc_k_info_t *krdc;
1468 	int index;
1469 
1470 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1471 
1472 	for (index = 0; index < rdc_max_sets; index++) {
1473 		urdc = &rdc_u_info[index];
1474 		krdc = &rdc_k_info[index];
1475 
1476 		if (!IS_CONFIGURED(krdc))
1477 			continue;
1478 
1479 		if (strncmp(urdc->secondary.file,
1480 		    rdc_set->secondary.file, NSC_MAXPATH) != 0)
1481 			continue;
1482 		if (strncmp(urdc->secondary.intf,
1483 		    rdc_set->secondary.intf, MAX_RDC_HOST_SIZE) != 0)
1484 			continue;
1485 
1486 		break;
1487 	}
1488 
1489 	if (index < rdc_max_sets)
1490 		return (index);
1491 	else
1492 		return (-1);
1493 }
1494 
1495 
1496 /*
1497  * Looks up an rdc set to check if it is already configured, to be used from
1498  * functions called from the config ioctl where the interface names can be
1499  * used for comparison.
1500  *
1501  * Must be called with rdc_conf_lock held.
1502  */
1503 
1504 int
1505 rdc_lookup_byname(rdc_set_t *rdc_set)
1506 {
1507 	rdc_u_info_t *urdc;
1508 	rdc_k_info_t *krdc;
1509 	int index;
1510 
1511 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1512 
1513 	for (index = 0; index < rdc_max_sets; index++) {
1514 		urdc = &rdc_u_info[index];
1515 		krdc = &rdc_k_info[index];
1516 
1517 		ASSERT(krdc->index == index);
1518 		ASSERT(urdc->index == index);
1519 
1520 		if (!IS_CONFIGURED(krdc))
1521 			continue;
1522 
1523 		if (strncmp(urdc->primary.file, rdc_set->primary.file,
1524 		    NSC_MAXPATH) != 0)
1525 			continue;
1526 		if (strncmp(urdc->primary.intf, rdc_set->primary.intf,
1527 		    MAX_RDC_HOST_SIZE) != 0)
1528 			continue;
1529 		if (strncmp(urdc->secondary.file, rdc_set->secondary.file,
1530 		    NSC_MAXPATH) != 0)
1531 			continue;
1532 		if (strncmp(urdc->secondary.intf, rdc_set->secondary.intf,
1533 		    MAX_RDC_HOST_SIZE) != 0)
1534 			continue;
1535 
1536 		break;
1537 	}
1538 
1539 	if (index < rdc_max_sets)
1540 		return (index);
1541 	else
1542 		return (-1);
1543 }
1544 
1545 /*
1546  * Looks up a secondary hostname and device, to be used from
1547  * functions called from the config ioctl where the interface names can be
1548  * used for comparison.
1549  *
1550  * Must be called with rdc_conf_lock held.
1551  */
1552 
1553 int
1554 rdc_lookup_byhostdev(char *intf, char *file)
1555 {
1556 	rdc_u_info_t *urdc;
1557 	rdc_k_info_t *krdc;
1558 	int index;
1559 
1560 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1561 
1562 	for (index = 0; index < rdc_max_sets; index++) {
1563 		urdc = &rdc_u_info[index];
1564 		krdc = &rdc_k_info[index];
1565 
1566 		ASSERT(krdc->index == index);
1567 		ASSERT(urdc->index == index);
1568 
1569 		if (!IS_CONFIGURED(krdc))
1570 			continue;
1571 
1572 		if (strncmp(urdc->secondary.file, file,
1573 		    NSC_MAXPATH) != 0)
1574 			continue;
1575 		if (strncmp(urdc->secondary.intf, intf,
1576 		    MAX_RDC_HOST_SIZE) != 0)
1577 			continue;
1578 		break;
1579 	}
1580 
1581 	if (index < rdc_max_sets)
1582 		return (index);
1583 	else
1584 		return (-1);
1585 }
1586 
1587 
1588 /*
1589  * Looks up an rdc set to see if it is currently enabled, to be used on the
1590  * server so that the interface addresses must be used for comparison, as
1591  * the interface names may differ from those used on the client.
1592  *
1593  */
1594 
1595 int
1596 rdc_lookup_byaddr(rdc_set_t *rdc_set)
1597 {
1598 	rdc_u_info_t *urdc;
1599 #ifdef DEBUG
1600 	rdc_k_info_t *krdc;
1601 #endif
1602 	int index;
1603 
1604 	for (index = 0; index < rdc_max_sets; index++) {
1605 		urdc = &rdc_u_info[index];
1606 #ifdef DEBUG
1607 		krdc = &rdc_k_info[index];
1608 #endif
1609 		ASSERT(krdc->index == index);
1610 		ASSERT(urdc->index == index);
1611 
1612 		if (!IS_ENABLED(urdc))
1613 			continue;
1614 
1615 		if (strcmp(urdc->primary.file, rdc_set->primary.file) != 0)
1616 			continue;
1617 
1618 		if (strcmp(urdc->secondary.file, rdc_set->secondary.file) != 0)
1619 			continue;
1620 
1621 		if (bcmp(urdc->primary.addr.buf, rdc_set->primary.addr.buf,
1622 		    urdc->primary.addr.len) != 0) {
1623 			continue;
1624 		}
1625 
1626 		if (bcmp(urdc->secondary.addr.buf, rdc_set->secondary.addr.buf,
1627 		    urdc->secondary.addr.len) != 0) {
1628 			continue;
1629 		}
1630 
1631 		break;
1632 	}
1633 
1634 	if (index < rdc_max_sets)
1635 		return (index);
1636 	else
1637 		return (-1);
1638 }
1639 
1640 
1641 /*
1642  * Return index of first multihop or 1-to-many
1643  * Behavior controlled by setting ismany.
1644  * ismany TRUE (one-to-many)
1645  * ismany FALSE (multihops)
1646  *
1647  */
1648 static int
1649 rdc_lookup_multimany(rdc_k_info_t *krdc, const int ismany)
1650 {
1651 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1652 	rdc_u_info_t *utmp;
1653 	rdc_k_info_t *ktmp;
1654 	char *pathname;
1655 	int index;
1656 	int role;
1657 
1658 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1659 	ASSERT(MUTEX_HELD(&rdc_many_lock));
1660 
1661 	if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1662 		/* this host is the primary of the krdc set */
1663 		pathname = urdc->primary.file;
1664 		if (ismany) {
1665 			/*
1666 			 * 1-many sets are linked by primary :
1667 			 * look for matching primary on this host
1668 			 */
1669 			role = RDC_PRIMARY;
1670 		} else {
1671 			/*
1672 			 * multihop sets link primary to secondary :
1673 			 * look for matching secondary on this host
1674 			 */
1675 			role = 0;
1676 		}
1677 	} else {
1678 		/* this host is the secondary of the krdc set */
1679 		pathname = urdc->secondary.file;
1680 		if (ismany) {
1681 			/*
1682 			 * 1-many sets are linked by primary, so if
1683 			 * this host is the secondary of the set this
1684 			 * cannot require 1-many linkage.
1685 			 */
1686 			return (-1);
1687 		} else {
1688 			/*
1689 			 * multihop sets link primary to secondary :
1690 			 * look for matching primary on this host
1691 			 */
1692 			role = RDC_PRIMARY;
1693 		}
1694 	}
1695 
1696 	for (index = 0; index < rdc_max_sets; index++) {
1697 		utmp = &rdc_u_info[index];
1698 		ktmp = &rdc_k_info[index];
1699 
1700 		if (!IS_CONFIGURED(ktmp)) {
1701 			continue;
1702 		}
1703 
1704 		if (role == RDC_PRIMARY) {
1705 			/*
1706 			 * Find a primary that is this host and is not
1707 			 * krdc but shares the same data volume as krdc.
1708 			 */
1709 			if ((rdc_get_vflags(utmp) & RDC_PRIMARY) &&
1710 			    strncmp(utmp->primary.file, pathname,
1711 			    NSC_MAXPATH) == 0 && (krdc != ktmp)) {
1712 				break;
1713 			}
1714 		} else {
1715 			/*
1716 			 * Find a secondary that is this host and is not
1717 			 * krdc but shares the same data volume as krdc.
1718 			 */
1719 			if (!(rdc_get_vflags(utmp) & RDC_PRIMARY) &&
1720 			    strncmp(utmp->secondary.file, pathname,
1721 			    NSC_MAXPATH) == 0 && (krdc != ktmp)) {
1722 				break;
1723 			}
1724 		}
1725 	}
1726 
1727 	if (index < rdc_max_sets)
1728 		return (index);
1729 	else
1730 		return (-1);
1731 }
1732 
1733 /*
1734  * Returns secondary match that is configured.
1735  *
1736  * Used by enable & resume code.
1737  * Must be called with rdc_conf_lock held.
1738  */
1739 
1740 static int
1741 rdc_lookup_secondary(char *pathname)
1742 {
1743 	rdc_u_info_t *urdc;
1744 	rdc_k_info_t *krdc;
1745 	int index;
1746 
1747 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1748 
1749 	for (index = 0; index < rdc_max_sets; index++) {
1750 		urdc = &rdc_u_info[index];
1751 		krdc = &rdc_k_info[index];
1752 
1753 		ASSERT(krdc->index == index);
1754 		ASSERT(urdc->index == index);
1755 
1756 		if (!IS_CONFIGURED(krdc))
1757 			continue;
1758 
1759 		if (!IS_STATE(urdc, RDC_PRIMARY)) {
1760 			if (strncmp(pathname, urdc->secondary.file,
1761 			    NSC_MAXPATH) == 0)
1762 			return (index);
1763 		}
1764 	}
1765 
1766 	return (-1);
1767 }
1768 
1769 
1770 static nsc_fd_t *
1771 rdc_open_direct(rdc_k_info_t *krdc)
1772 {
1773 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1774 	int rc;
1775 
1776 	if (krdc->remote_fd == NULL)
1777 		krdc->remote_fd = nsc_open(urdc->direct_file,
1778 		    NSC_RDCHR_ID|NSC_DEVICE|NSC_RDWR, 0, 0, &rc);
1779 	return (krdc->remote_fd);
1780 }
1781 
1782 static void
1783 rdc_close_direct(rdc_k_info_t *krdc)
1784 {
1785 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1786 
1787 	urdc->direct_file[0] = 0;
1788 	if (krdc->remote_fd) {
1789 		if (nsc_close(krdc->remote_fd) == 0) {
1790 			krdc->remote_fd = NULL;
1791 		}
1792 	}
1793 }
1794 
1795 
1796 #ifdef DEBUG_MANY
1797 static void
1798 print_many(rdc_k_info_t *start)
1799 {
1800 	rdc_k_info_t *p = start;
1801 	rdc_u_info_t *q = &rdc_u_info[p->index];
1802 
1803 	do {
1804 		cmn_err(CE_CONT, "!krdc %p, %s %s (many_nxt %p multi_nxt %p)\n",
1805 		    p, q->primary.file, q->secondary.file, p->many_next,
1806 		    p->multi_next);
1807 		delay(10);
1808 		p = p->many_next;
1809 		q = &rdc_u_info[p->index];
1810 	} while (p && p != start);
1811 }
1812 #endif /* DEBUG_MANY */
1813 
1814 
1815 static int
1816 add_to_multi(rdc_k_info_t *krdc)
1817 {
1818 	rdc_u_info_t *urdc;
1819 	rdc_k_info_t *ktmp;
1820 	rdc_u_info_t *utmp;
1821 	int mindex;
1822 	int domulti;
1823 
1824 	urdc = &rdc_u_info[krdc->index];
1825 
1826 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1827 	ASSERT(MUTEX_HELD(&rdc_many_lock));
1828 
1829 	/* Now find companion krdc */
1830 	mindex = rdc_lookup_multimany(krdc, FALSE);
1831 
1832 #ifdef DEBUG_MANY
1833 	cmn_err(CE_NOTE,
1834 	    "!add_to_multi: lookup_multimany: mindex %d prim %s sec %s",
1835 	    mindex, urdc->primary.file, urdc->secondary.file);
1836 #endif
1837 
1838 	if (mindex >= 0) {
1839 		ktmp = &rdc_k_info[mindex];
1840 		utmp = &rdc_u_info[mindex];
1841 
1842 		domulti = 1;
1843 
1844 		if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1845 		    ktmp->multi_next != NULL) {
1846 			/*
1847 			 * We are adding a new primary to a many
1848 			 * group that is the target of a multihop, just
1849 			 * ignore it since we are linked in elsewhere.
1850 			 */
1851 			domulti = 0;
1852 		}
1853 
1854 		if (domulti) {
1855 			if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1856 				/* Is previous leg using direct file I/O? */
1857 				if (utmp->direct_file[0] != 0) {
1858 					/* It is, so cannot proceed */
1859 					return (-1);
1860 				}
1861 			} else {
1862 				/* Is this leg using direct file I/O? */
1863 				if (urdc->direct_file[0] != 0) {
1864 					/* It is, so cannot proceed */
1865 					return (-1);
1866 				}
1867 			}
1868 			krdc->multi_next = ktmp;
1869 			ktmp->multi_next = krdc;
1870 		}
1871 	} else {
1872 		krdc->multi_next = NULL;
1873 #ifdef DEBUG_MANY
1874 		cmn_err(CE_NOTE, "!add_to_multi: NULL multi_next index %d",
1875 		    krdc->index);
1876 #endif
1877 	}
1878 
1879 	return (0);
1880 }
1881 
1882 
1883 /*
1884  * Add a new set to the circular list of 1-to-many primaries and chain
1885  * up any multihop as well.
1886  */
1887 static int
1888 add_to_many(rdc_k_info_t *krdc)
1889 {
1890 	rdc_k_info_t *okrdc;
1891 	int oindex;
1892 
1893 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1894 
1895 	rdc_many_enter(krdc);
1896 
1897 	if (add_to_multi(krdc) < 0) {
1898 		rdc_many_exit(krdc);
1899 		return (-1);
1900 	}
1901 
1902 	oindex = rdc_lookup_multimany(krdc, TRUE);
1903 	if (oindex < 0) {
1904 #ifdef DEBUG_MANY
1905 		print_many(krdc);
1906 #endif
1907 		rdc_many_exit(krdc);
1908 		return (0);
1909 	}
1910 
1911 	okrdc = &rdc_k_info[oindex];
1912 
1913 #ifdef DEBUG_MANY
1914 	print_many(okrdc);
1915 #endif
1916 	krdc->many_next = okrdc->many_next;
1917 	okrdc->many_next = krdc;
1918 
1919 #ifdef DEBUG_MANY
1920 	print_many(okrdc);
1921 #endif
1922 	rdc_many_exit(krdc);
1923 	return (0);
1924 }
1925 
1926 
1927 /*
1928  * Remove a set from the circular list of 1-to-many primaries.
1929  */
1930 static void
1931 remove_from_many(rdc_k_info_t *old)
1932 {
1933 	rdc_u_info_t *uold = &rdc_u_info[old->index];
1934 	rdc_k_info_t *p, *q;
1935 
1936 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
1937 
1938 	rdc_many_enter(old);
1939 
1940 #ifdef DEBUG_MANY
1941 	cmn_err(CE_NOTE, "!rdc: before remove_from_many");
1942 	print_many(old);
1943 #endif
1944 
1945 	if (old->many_next == old) {
1946 		/* remove from multihop */
1947 		if ((q = old->multi_next) != NULL) {
1948 			ASSERT(q->multi_next == old);
1949 			q->multi_next = NULL;
1950 			old->multi_next = NULL;
1951 		}
1952 
1953 		rdc_many_exit(old);
1954 		return;
1955 	}
1956 
1957 	/* search */
1958 	for (p = old->many_next; p->many_next != old; p = p->many_next)
1959 	;
1960 
1961 	p->many_next = old->many_next;
1962 	old->many_next = old;
1963 
1964 	if ((q = old->multi_next) != NULL) {
1965 		/*
1966 		 * old was part of a multihop, so switch multi pointers
1967 		 * to someone remaining on the many chain
1968 		 */
1969 		ASSERT(p->multi_next == NULL);
1970 
1971 		q->multi_next = p;
1972 		p->multi_next = q;
1973 		old->multi_next = NULL;
1974 	}
1975 
1976 #ifdef DEBUG_MANY
1977 	if (p == old) {
1978 		cmn_err(CE_NOTE, "!rdc: after remove_from_many empty");
1979 	} else {
1980 		cmn_err(CE_NOTE, "!rdc: after remove_from_many");
1981 		print_many(p);
1982 	}
1983 #endif
1984 
1985 	rdc_clr_mflags(&rdc_u_info[p->index],
1986 	    (rdc_get_vflags(uold) & RDC_MFLAGS));
1987 
1988 	rdc_many_exit(old);
1989 }
1990 
1991 
1992 static int
1993 _rdc_enable(rdc_set_t *rdc_set, int options, spcs_s_info_t kstatus)
1994 {
1995 	int index;
1996 	char *rhost;
1997 	struct netbuf *addrp;
1998 	rdc_k_info_t *krdc;
1999 	rdc_u_info_t *urdc;
2000 	rdc_srv_t *svp = NULL;
2001 	char *local_file;
2002 	char *local_bitmap;
2003 	char *diskq;
2004 	int rc;
2005 	nsc_size_t maxfbas;
2006 	rdc_group_t *grp;
2007 
2008 	if ((rdc_set->primary.intf[0] == 0) ||
2009 	    (rdc_set->primary.addr.len == 0) ||
2010 	    (rdc_set->primary.file[0] == 0) ||
2011 	    (rdc_set->primary.bitmap[0] == 0) ||
2012 	    (rdc_set->secondary.intf[0] == 0) ||
2013 	    (rdc_set->secondary.addr.len == 0) ||
2014 	    (rdc_set->secondary.file[0] == 0) ||
2015 	    (rdc_set->secondary.bitmap[0] == 0)) {
2016 		spcs_s_add(kstatus, RDC_EEMPTY);
2017 		return (RDC_EEMPTY);
2018 	}
2019 
2020 	/* Next check there aren't any enabled rdc sets which match. */
2021 
2022 	mutex_enter(&rdc_conf_lock);
2023 
2024 	if (rdc_lookup_byname(rdc_set) >= 0) {
2025 		mutex_exit(&rdc_conf_lock);
2026 		spcs_s_add(kstatus, RDC_EENABLED, rdc_set->primary.intf,
2027 		    rdc_set->primary.file, rdc_set->secondary.intf,
2028 		    rdc_set->secondary.file);
2029 		return (RDC_EENABLED);
2030 	}
2031 
2032 	if (rdc_lookup_many2one(rdc_set) >= 0) {
2033 		mutex_exit(&rdc_conf_lock);
2034 		spcs_s_add(kstatus, RDC_EMANY2ONE, rdc_set->primary.intf,
2035 		    rdc_set->primary.file, rdc_set->secondary.intf,
2036 		    rdc_set->secondary.file);
2037 		return (RDC_EMANY2ONE);
2038 	}
2039 
2040 	if (rdc_set->netconfig->knc_proto == NULL) {
2041 		mutex_exit(&rdc_conf_lock);
2042 		spcs_s_add(kstatus, RDC_ENETCONFIG);
2043 		return (RDC_ENETCONFIG);
2044 	}
2045 
2046 	if (rdc_set->primary.addr.len == 0) {
2047 		mutex_exit(&rdc_conf_lock);
2048 		spcs_s_add(kstatus, RDC_ENETBUF, rdc_set->primary.file);
2049 		return (RDC_ENETBUF);
2050 	}
2051 
2052 	if (rdc_set->secondary.addr.len == 0) {
2053 		mutex_exit(&rdc_conf_lock);
2054 		spcs_s_add(kstatus, RDC_ENETBUF, rdc_set->secondary.file);
2055 		return (RDC_ENETBUF);
2056 	}
2057 
2058 	/* Check that the local data volume isn't in use as a bitmap */
2059 	if (options & RDC_OPT_PRIMARY)
2060 		local_file = rdc_set->primary.file;
2061 	else
2062 		local_file = rdc_set->secondary.file;
2063 	if (rdc_lookup_bitmap(local_file) >= 0) {
2064 		mutex_exit(&rdc_conf_lock);
2065 		spcs_s_add(kstatus, RDC_EVOLINUSE, local_file);
2066 		return (RDC_EVOLINUSE);
2067 	}
2068 
2069 	/* check that the secondary data volume isn't in use */
2070 	if (!(options & RDC_OPT_PRIMARY)) {
2071 		local_file = rdc_set->secondary.file;
2072 		if (rdc_lookup_secondary(local_file) >= 0) {
2073 			mutex_exit(&rdc_conf_lock);
2074 			spcs_s_add(kstatus, RDC_EVOLINUSE, local_file);
2075 			return (RDC_EVOLINUSE);
2076 		}
2077 	}
2078 
2079 	/* check that the local data vol is not in use as a diskqueue */
2080 	if (options & RDC_OPT_PRIMARY) {
2081 		if (rdc_lookup_diskq(rdc_set->primary.file) >= 0) {
2082 			mutex_exit(&rdc_conf_lock);
2083 			spcs_s_add(kstatus,
2084 			    RDC_EVOLINUSE, rdc_set->primary.file);
2085 			return (RDC_EVOLINUSE);
2086 		}
2087 	}
2088 
2089 	/* Check that the bitmap isn't in use as a data volume */
2090 	if (options & RDC_OPT_PRIMARY)
2091 		local_bitmap = rdc_set->primary.bitmap;
2092 	else
2093 		local_bitmap = rdc_set->secondary.bitmap;
2094 	if (rdc_lookup_configured(local_bitmap) >= 0) {
2095 		mutex_exit(&rdc_conf_lock);
2096 		spcs_s_add(kstatus, RDC_EBMPINUSE, local_bitmap);
2097 		return (RDC_EBMPINUSE);
2098 	}
2099 
2100 	/* Check that the bitmap isn't already in use as a bitmap */
2101 	if (rdc_lookup_bitmap(local_bitmap) >= 0) {
2102 		mutex_exit(&rdc_conf_lock);
2103 		spcs_s_add(kstatus, RDC_EBMPINUSE, local_bitmap);
2104 		return (RDC_EBMPINUSE);
2105 	}
2106 
2107 	/* check that the diskq (if here) is not in use */
2108 	diskq = rdc_set->disk_queue;
2109 	if (diskq[0] && rdc_diskq_inuse(rdc_set, diskq)) {
2110 		mutex_exit(&rdc_conf_lock);
2111 		spcs_s_add(kstatus, RDC_EDISKQINUSE, diskq);
2112 		return (RDC_EDISKQINUSE);
2113 	}
2114 
2115 
2116 	/* Set urdc->volume_size */
2117 	index = rdc_dev_open(rdc_set, options);
2118 	if (index < 0) {
2119 		mutex_exit(&rdc_conf_lock);
2120 		if (options & RDC_OPT_PRIMARY)
2121 			spcs_s_add(kstatus, RDC_EOPEN, rdc_set->primary.intf,
2122 			    rdc_set->primary.file);
2123 		else
2124 			spcs_s_add(kstatus, RDC_EOPEN, rdc_set->secondary.intf,
2125 			    rdc_set->secondary.file);
2126 		return (RDC_EOPEN);
2127 	}
2128 
2129 	urdc = &rdc_u_info[index];
2130 	krdc = &rdc_k_info[index];
2131 
2132 	/* copy relevant parts of rdc_set to urdc field by field */
2133 
2134 	(void) strncpy(urdc->primary.intf, rdc_set->primary.intf,
2135 	    MAX_RDC_HOST_SIZE);
2136 	(void) strncpy(urdc->secondary.intf, rdc_set->secondary.intf,
2137 	    MAX_RDC_HOST_SIZE);
2138 
2139 	(void) strncpy(urdc->group_name, rdc_set->group_name, NSC_MAXPATH);
2140 	(void) strncpy(urdc->disk_queue, rdc_set->disk_queue, NSC_MAXPATH);
2141 
2142 	dup_rdc_netbuf(&rdc_set->primary.addr, &urdc->primary.addr);
2143 	(void) strncpy(urdc->primary.file, rdc_set->primary.file, NSC_MAXPATH);
2144 	(void) strncpy(urdc->primary.bitmap, rdc_set->primary.bitmap,
2145 	    NSC_MAXPATH);
2146 
2147 	dup_rdc_netbuf(&rdc_set->secondary.addr, &urdc->secondary.addr);
2148 	(void) strncpy(urdc->secondary.file, rdc_set->secondary.file,
2149 	    NSC_MAXPATH);
2150 	(void) strncpy(urdc->secondary.bitmap, rdc_set->secondary.bitmap,
2151 	    NSC_MAXPATH);
2152 
2153 	urdc->setid = rdc_set->setid;
2154 
2155 	/*
2156 	 * before we try to add to group, or create one, check out
2157 	 * if we are doing the wrong thing with the diskq
2158 	 */
2159 
2160 	if (urdc->disk_queue[0] && (options & RDC_OPT_SYNC)) {
2161 		mutex_exit(&rdc_conf_lock);
2162 		rdc_dev_close(krdc);
2163 		spcs_s_add(kstatus, RDC_EQWRONGMODE);
2164 		return (RDC_EQWRONGMODE);
2165 	}
2166 
2167 	if ((rc = add_to_group(krdc, options, RDC_CMD_ENABLE)) != 0) {
2168 		mutex_exit(&rdc_conf_lock);
2169 		rdc_dev_close(krdc);
2170 		if (rc == RDC_EQNOADD) {
2171 			spcs_s_add(kstatus, RDC_EQNOADD, rdc_set->disk_queue);
2172 			return (RDC_EQNOADD);
2173 		} else {
2174 			spcs_s_add(kstatus, RDC_EGROUP,
2175 			    rdc_set->primary.intf, rdc_set->primary.file,
2176 			    rdc_set->secondary.intf, rdc_set->secondary.file,
2177 			    rdc_set->group_name);
2178 			return (RDC_EGROUP);
2179 		}
2180 	}
2181 
2182 	/*
2183 	 * maxfbas was set in rdc_dev_open as primary's maxfbas.
2184 	 * If diskq's maxfbas is smaller, then use diskq's.
2185 	 */
2186 	grp = krdc->group;
2187 	if (grp && RDC_IS_DISKQ(grp) && (grp->diskqfd != 0)) {
2188 		rc = _rdc_rsrv_diskq(grp);
2189 		if (RDC_SUCCESS(rc)) {
2190 			rc = nsc_maxfbas(grp->diskqfd, 0, &maxfbas);
2191 			if (rc == 0) {
2192 #ifdef DEBUG
2193 				if (krdc->maxfbas != maxfbas)
2194 					cmn_err(CE_NOTE,
2195 					    "!_rdc_enable: diskq maxfbas = %"
2196 					    NSC_SZFMT ", primary maxfbas = %"
2197 					    NSC_SZFMT, maxfbas, krdc->maxfbas);
2198 #endif
2199 				krdc->maxfbas = min(krdc->maxfbas, maxfbas);
2200 			} else {
2201 				cmn_err(CE_WARN,
2202 				    "!_rdc_enable: diskq maxfbas failed (%d)",
2203 				    rc);
2204 			}
2205 			_rdc_rlse_diskq(grp);
2206 		} else {
2207 			cmn_err(CE_WARN,
2208 			    "!_rdc_enable: diskq reserve failed (%d)", rc);
2209 		}
2210 	}
2211 
2212 	rdc_init_flags(urdc);
2213 	(void) strncpy(urdc->direct_file, rdc_set->direct_file, NSC_MAXPATH);
2214 	if ((options & RDC_OPT_PRIMARY) && rdc_set->direct_file[0]) {
2215 		if (rdc_open_direct(krdc) == NULL)
2216 			rdc_set_flags(urdc, RDC_FCAL_FAILED);
2217 	}
2218 
2219 	krdc->many_next = krdc;
2220 
2221 	ASSERT(krdc->type_flag == 0);
2222 	krdc->type_flag = RDC_CONFIGURED;
2223 
2224 	if (options & RDC_OPT_PRIMARY)
2225 		rdc_set_flags(urdc, RDC_PRIMARY);
2226 
2227 	if (options & RDC_OPT_ASYNC)
2228 		krdc->type_flag |= RDC_ASYNCMODE;
2229 
2230 	set_busy(krdc);
2231 	urdc->syshostid = rdc_set->syshostid;
2232 
2233 	if (add_to_many(krdc) < 0) {
2234 		mutex_exit(&rdc_conf_lock);
2235 
2236 		rdc_group_enter(krdc);
2237 
2238 		spcs_s_add(kstatus, RDC_EMULTI);
2239 		rc = RDC_EMULTI;
2240 		goto fail;
2241 	}
2242 
2243 	/* Configured but not enabled */
2244 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2245 
2246 	mutex_exit(&rdc_conf_lock);
2247 
2248 	rdc_group_enter(krdc);
2249 
2250 	/* Configured but not enabled */
2251 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2252 
2253 	/*
2254 	 * The rdc set is configured but not yet enabled. Other operations must
2255 	 * ignore this set until it is enabled.
2256 	 */
2257 
2258 	urdc->sync_pos = 0;
2259 
2260 	if (rdc_set->maxqfbas > 0)
2261 		urdc->maxqfbas = rdc_set->maxqfbas;
2262 	else
2263 		urdc->maxqfbas = rdc_maxthres_queue;
2264 
2265 	if (rdc_set->maxqitems > 0)
2266 		urdc->maxqitems = rdc_set->maxqitems;
2267 	else
2268 		urdc->maxqitems = rdc_max_qitems;
2269 
2270 	if (rdc_set->asyncthr > 0)
2271 		urdc->asyncthr = rdc_set->asyncthr;
2272 	else
2273 		urdc->asyncthr = rdc_asyncthr;
2274 
2275 	if (urdc->autosync == -1) {
2276 		/* Still unknown */
2277 		if (rdc_set->autosync > 0)
2278 			urdc->autosync = 1;
2279 		else
2280 			urdc->autosync = 0;
2281 	}
2282 
2283 	urdc->netconfig = rdc_set->netconfig;
2284 
2285 	if (options & RDC_OPT_PRIMARY) {
2286 		rhost = rdc_set->secondary.intf;
2287 		addrp = &rdc_set->secondary.addr;
2288 	} else {
2289 		rhost = rdc_set->primary.intf;
2290 		addrp = &rdc_set->primary.addr;
2291 	}
2292 
2293 	if (options & RDC_OPT_ASYNC)
2294 		rdc_set_flags(urdc, RDC_ASYNC);
2295 
2296 	svp = rdc_create_svinfo(rhost, addrp, urdc->netconfig);
2297 	if (svp == NULL) {
2298 		spcs_s_add(kstatus, ENOMEM);
2299 		rc = ENOMEM;
2300 		goto fail;
2301 	}
2302 	urdc->netconfig = NULL;		/* This will be no good soon */
2303 
2304 	rdc_kstat_create(index);
2305 
2306 	/* Don't set krdc->intf here */
2307 
2308 	if (rdc_enable_bitmap(krdc, options & RDC_OPT_SETBMP) < 0)
2309 		goto bmpfail;
2310 
2311 	RDC_ZERO_BITREF(krdc);
2312 	if (krdc->lsrv == NULL)
2313 		krdc->lsrv = svp;
2314 	else {
2315 #ifdef DEBUG
2316 		cmn_err(CE_WARN, "!_rdc_enable: krdc->lsrv already set: %p",
2317 		    (void *) krdc->lsrv);
2318 #endif
2319 		rdc_destroy_svinfo(svp);
2320 	}
2321 	svp = NULL;
2322 
2323 	/* Configured but not enabled */
2324 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2325 
2326 	/* And finally */
2327 
2328 	krdc->remote_index = -1;
2329 	/* Should we set the whole group logging? */
2330 	rdc_set_flags(urdc, RDC_ENABLED | RDC_LOGGING);
2331 
2332 	rdc_group_exit(krdc);
2333 
2334 	if (rdc_intercept(krdc) != 0) {
2335 		rdc_group_enter(krdc);
2336 		rdc_clr_flags(urdc, RDC_ENABLED);
2337 		if (options & RDC_OPT_PRIMARY)
2338 			spcs_s_add(kstatus, RDC_EREGISTER, urdc->primary.file);
2339 		else
2340 			spcs_s_add(kstatus, RDC_EREGISTER,
2341 			    urdc->secondary.file);
2342 #ifdef DEBUG
2343 		cmn_err(CE_NOTE, "!nsc_register_path failed %s",
2344 		    urdc->primary.file);
2345 #endif
2346 		rc = RDC_EREGISTER;
2347 		goto bmpfail;
2348 	}
2349 #ifdef DEBUG
2350 	cmn_err(CE_NOTE, "!SNDR: enabled %s %s", urdc->primary.file,
2351 	    urdc->secondary.file);
2352 #endif
2353 
2354 	rdc_write_state(urdc);
2355 
2356 	mutex_enter(&rdc_conf_lock);
2357 	wakeup_busy(krdc);
2358 	mutex_exit(&rdc_conf_lock);
2359 
2360 	return (0);
2361 
2362 bmpfail:
2363 	if (options & RDC_OPT_PRIMARY)
2364 		spcs_s_add(kstatus, RDC_EBITMAP, rdc_set->primary.bitmap);
2365 	else
2366 		spcs_s_add(kstatus, RDC_EBITMAP, rdc_set->secondary.bitmap);
2367 	rc = RDC_EBITMAP;
2368 	if (rdc_get_vflags(urdc) & RDC_ENABLED) {
2369 		rdc_group_exit(krdc);
2370 		(void) rdc_unintercept(krdc);
2371 		rdc_group_enter(krdc);
2372 	}
2373 
2374 fail:
2375 	rdc_kstat_delete(index);
2376 	rdc_group_exit(krdc);
2377 	if (krdc->intf) {
2378 		rdc_if_t *ip = krdc->intf;
2379 		mutex_enter(&rdc_conf_lock);
2380 		krdc->intf = NULL;
2381 		rdc_remove_from_if(ip);
2382 		mutex_exit(&rdc_conf_lock);
2383 	}
2384 	rdc_group_enter(krdc);
2385 	/* Configured but not enabled */
2386 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2387 
2388 	rdc_dev_close(krdc);
2389 	rdc_close_direct(krdc);
2390 	rdc_destroy_svinfo(svp);
2391 
2392 	/* Configured but not enabled */
2393 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2394 
2395 	rdc_group_exit(krdc);
2396 
2397 	mutex_enter(&rdc_conf_lock);
2398 
2399 	/* Configured but not enabled */
2400 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2401 
2402 	remove_from_group(krdc);
2403 
2404 	if (IS_MANY(krdc) || IS_MULTI(krdc))
2405 		remove_from_many(krdc);
2406 
2407 	rdc_u_init(urdc);
2408 
2409 	ASSERT(krdc->type_flag & RDC_CONFIGURED);
2410 	krdc->type_flag = 0;
2411 	wakeup_busy(krdc);
2412 
2413 	mutex_exit(&rdc_conf_lock);
2414 
2415 	return (rc);
2416 }
2417 
2418 static int
2419 rdc_enable(rdc_config_t *uparms, spcs_s_info_t kstatus)
2420 {
2421 	int rc;
2422 	char itmp[10];
2423 
2424 	if (!(uparms->options & RDC_OPT_SYNC) &&
2425 	    !(uparms->options & RDC_OPT_ASYNC)) {
2426 		rc = RDC_EEINVAL;
2427 		(void) spcs_s_inttostring(
2428 		    uparms->options, itmp, sizeof (itmp), 1);
2429 		spcs_s_add(kstatus, RDC_EEINVAL, itmp);
2430 		goto done;
2431 	}
2432 
2433 	if (!(uparms->options & RDC_OPT_PRIMARY) &&
2434 	    !(uparms->options & RDC_OPT_SECONDARY)) {
2435 		rc = RDC_EEINVAL;
2436 		(void) spcs_s_inttostring(
2437 		    uparms->options, itmp, sizeof (itmp), 1);
2438 		spcs_s_add(kstatus, RDC_EEINVAL, itmp);
2439 		goto done;
2440 	}
2441 
2442 	if (!(uparms->options & RDC_OPT_SETBMP) &&
2443 	    !(uparms->options & RDC_OPT_CLRBMP)) {
2444 		rc = RDC_EEINVAL;
2445 		(void) spcs_s_inttostring(
2446 		    uparms->options, itmp, sizeof (itmp), 1);
2447 		spcs_s_add(kstatus, RDC_EEINVAL, itmp);
2448 		goto done;
2449 	}
2450 
2451 	rc = _rdc_enable(uparms->rdc_set, uparms->options, kstatus);
2452 done:
2453 	return (rc);
2454 }
2455 
2456 /* ARGSUSED */
2457 static int
2458 _rdc_disable(rdc_k_info_t *krdc, rdc_config_t *uap, spcs_s_info_t kstatus)
2459 {
2460 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2461 	rdc_if_t *ip;
2462 	int index = krdc->index;
2463 	disk_queue *q;
2464 	rdc_set_t *rdc_set = uap->rdc_set;
2465 
2466 	ASSERT(krdc->group != NULL);
2467 	rdc_group_enter(krdc);
2468 #ifdef DEBUG
2469 	ASSERT(rdc_check(krdc, rdc_set) == 0);
2470 #else
2471 	if (((uap->options & RDC_OPT_FORCE_DISABLE) == 0) &&
2472 	    rdc_check(krdc, rdc_set)) {
2473 		rdc_group_exit(krdc);
2474 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
2475 		    rdc_set->secondary.file);
2476 		return (RDC_EALREADY);
2477 	}
2478 #endif
2479 
2480 	if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
2481 		halt_sync(krdc);
2482 		ASSERT(IS_ENABLED(urdc));
2483 	}
2484 	q = &krdc->group->diskq;
2485 
2486 	if (IS_ASYNC(urdc) && RDC_IS_DISKQ(krdc->group) &&
2487 	    ((!IS_STATE(urdc, RDC_LOGGING)) && (!QEMPTY(q)))) {
2488 		krdc->type_flag &= ~RDC_DISABLEPEND;
2489 		rdc_group_exit(krdc);
2490 		spcs_s_add(kstatus, RDC_EQNOTEMPTY, urdc->disk_queue);
2491 		return (RDC_EQNOTEMPTY);
2492 	}
2493 	rdc_group_exit(krdc);
2494 	(void) rdc_unintercept(krdc);
2495 
2496 #ifdef DEBUG
2497 	cmn_err(CE_NOTE, "!SNDR: disabled %s %s", urdc->primary.file,
2498 	    urdc->secondary.file);
2499 #endif
2500 
2501 	/* Configured but not enabled */
2502 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2503 
2504 	/*
2505 	 * No new io can come in through the io provider.
2506 	 * Wait for the async flusher to finish.
2507 	 */
2508 
2509 	if (IS_ASYNC(urdc) && !RDC_IS_DISKQ(krdc->group)) {
2510 		int tries = 2; /* in case of hopelessly stuck flusher threads */
2511 #ifdef DEBUG
2512 		net_queue *qp = &krdc->group->ra_queue;
2513 #endif
2514 		do {
2515 			if (!krdc->group->rdc_writer)
2516 				(void) rdc_writer(krdc->index);
2517 
2518 			(void) rdc_drain_queue(krdc->index);
2519 
2520 		} while (krdc->group->rdc_writer && tries--);
2521 
2522 		/* ok, force it to happen... */
2523 		if (rdc_drain_queue(krdc->index) != 0) {
2524 			do {
2525 				mutex_enter(&krdc->group->ra_queue.net_qlock);
2526 				krdc->group->asyncdis = 1;
2527 				cv_broadcast(&krdc->group->asyncqcv);
2528 				mutex_exit(&krdc->group->ra_queue.net_qlock);
2529 				cmn_err(CE_WARN,
2530 				    "!SNDR: async I/O pending and not flushed "
2531 				    "for %s during disable",
2532 				    urdc->primary.file);
2533 #ifdef DEBUG
2534 				cmn_err(CE_WARN,
2535 				    "!nitems: %" NSC_SZFMT " nblocks: %"
2536 				    NSC_SZFMT " head: 0x%p tail: 0x%p",
2537 				    qp->nitems, qp->blocks,
2538 				    (void *)qp->net_qhead,
2539 				    (void *)qp->net_qtail);
2540 #endif
2541 			} while (krdc->group->rdc_thrnum > 0);
2542 		}
2543 	}
2544 
2545 	mutex_enter(&rdc_conf_lock);
2546 	ip = krdc->intf;
2547 	krdc->intf = 0;
2548 
2549 	if (ip) {
2550 		rdc_remove_from_if(ip);
2551 	}
2552 
2553 	mutex_exit(&rdc_conf_lock);
2554 
2555 	rdc_group_enter(krdc);
2556 
2557 	/* Configured but not enabled */
2558 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2559 
2560 	/* Must not hold group lock during this function */
2561 	rdc_group_exit(krdc);
2562 	while (rdc_dump_alloc_bufs_cd(krdc->index) == EAGAIN)
2563 		delay(2);
2564 	rdc_group_enter(krdc);
2565 
2566 	(void) rdc_clear_state(krdc);
2567 
2568 	rdc_free_bitmap(krdc, RDC_CMD_DISABLE);
2569 	rdc_close_bitmap(krdc);
2570 
2571 	rdc_dev_close(krdc);
2572 	rdc_close_direct(krdc);
2573 
2574 	/* Configured but not enabled */
2575 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2576 
2577 	rdc_group_exit(krdc);
2578 
2579 	/*
2580 	 * we should now unregister the queue, with no conflicting
2581 	 * locks held. This is the last(only) member of the group
2582 	 */
2583 	if (krdc->group && RDC_IS_DISKQ(krdc->group) &&
2584 	    krdc->group->count == 1) { /* stop protecting queue */
2585 		rdc_unintercept_diskq(krdc->group);
2586 	}
2587 
2588 	mutex_enter(&rdc_conf_lock);
2589 
2590 	/* Configured but not enabled */
2591 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
2592 
2593 	wait_busy(krdc);
2594 
2595 	if (IS_MANY(krdc) || IS_MULTI(krdc))
2596 		remove_from_many(krdc);
2597 
2598 	remove_from_group(krdc);
2599 
2600 	krdc->remote_index = -1;
2601 	ASSERT(krdc->type_flag & RDC_CONFIGURED);
2602 	ASSERT(krdc->type_flag & RDC_DISABLEPEND);
2603 	krdc->type_flag = 0;
2604 #ifdef	DEBUG
2605 	if (krdc->dcio_bitmap)
2606 		cmn_err(CE_WARN, "!_rdc_disable: possible mem leak, "
2607 		    "dcio_bitmap");
2608 #endif
2609 	krdc->dcio_bitmap = NULL;
2610 	krdc->bitmap_ref = NULL;
2611 	krdc->bitmap_size = 0;
2612 	krdc->maxfbas = 0;
2613 	krdc->bitmap_write = 0;
2614 	krdc->disk_status = 0;
2615 	rdc_destroy_svinfo(krdc->lsrv);
2616 	krdc->lsrv = NULL;
2617 	krdc->multi_next = NULL;
2618 
2619 	rdc_u_init(urdc);
2620 
2621 	mutex_exit(&rdc_conf_lock);
2622 	rdc_kstat_delete(index);
2623 
2624 	return (0);
2625 }
2626 
2627 static int
2628 rdc_disable(rdc_config_t *uparms, spcs_s_info_t kstatus)
2629 {
2630 	rdc_k_info_t *krdc;
2631 	int index;
2632 	int rc;
2633 
2634 	mutex_enter(&rdc_conf_lock);
2635 
2636 	index = rdc_lookup_byname(uparms->rdc_set);
2637 	if (index >= 0)
2638 		krdc = &rdc_k_info[index];
2639 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
2640 		mutex_exit(&rdc_conf_lock);
2641 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
2642 		    uparms->rdc_set->secondary.file);
2643 		return (RDC_EALREADY);
2644 	}
2645 
2646 	krdc->type_flag |= RDC_DISABLEPEND;
2647 	wait_busy(krdc);
2648 	if (krdc->type_flag == 0) {
2649 		/* A resume or enable failed */
2650 		mutex_exit(&rdc_conf_lock);
2651 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
2652 		    uparms->rdc_set->secondary.file);
2653 		return (RDC_EALREADY);
2654 	}
2655 	mutex_exit(&rdc_conf_lock);
2656 
2657 	rc = _rdc_disable(krdc, uparms, kstatus);
2658 	return (rc);
2659 }
2660 
2661 
2662 /*
2663  * Checks whether the state of one of the other sets in the 1-many or
2664  * multi-hop config should prevent a sync from starting on this one.
2665  * Return NULL if no just cause or impediment is found, otherwise return
2666  * a pointer to the offending set.
2667  */
2668 static rdc_u_info_t *
2669 rdc_allow_pri_sync(rdc_u_info_t *urdc, int options)
2670 {
2671 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
2672 	rdc_k_info_t *ktmp;
2673 	rdc_u_info_t *utmp;
2674 	rdc_k_info_t *kmulti = NULL;
2675 
2676 	ASSERT(rdc_get_vflags(urdc) & RDC_PRIMARY);
2677 
2678 	rdc_many_enter(krdc);
2679 
2680 	/*
2681 	 * In the reverse sync case we need to check the previous leg of
2682 	 * the multi-hop config. The link to that set can be from any of
2683 	 * the 1-many list, so as we go through we keep an eye open for it.
2684 	 */
2685 	if ((options & RDC_OPT_REVERSE) && (IS_MULTI(krdc))) {
2686 		/* This set links to the first leg */
2687 		ktmp = krdc->multi_next;
2688 		utmp = &rdc_u_info[ktmp->index];
2689 		if (IS_ENABLED(utmp))
2690 			kmulti = ktmp;
2691 	}
2692 
2693 	if (IS_MANY(krdc)) {
2694 		for (ktmp = krdc->many_next; ktmp != krdc;
2695 		    ktmp = ktmp->many_next) {
2696 			utmp = &rdc_u_info[ktmp->index];
2697 
2698 			if (!IS_ENABLED(utmp))
2699 				continue;
2700 
2701 			if (options & RDC_OPT_FORWARD) {
2702 				/*
2703 				 * Reverse sync needed is bad, as it means a
2704 				 * reverse sync in progress or started and
2705 				 * didn't complete, so this primary volume
2706 				 * is not consistent. So we shouldn't copy
2707 				 * it to its secondary.
2708 				 */
2709 				if (rdc_get_mflags(utmp) & RDC_RSYNC_NEEDED) {
2710 					rdc_many_exit(krdc);
2711 					return (utmp);
2712 				}
2713 			} else {
2714 				/* Reverse, so see if we need to spot kmulti */
2715 				if ((kmulti == NULL) && (IS_MULTI(ktmp))) {
2716 					/* This set links to the first leg */
2717 					kmulti = ktmp->multi_next;
2718 					if (!IS_ENABLED(
2719 					    &rdc_u_info[kmulti->index]))
2720 						kmulti = NULL;
2721 				}
2722 
2723 				/*
2724 				 * Non-logging is bad, as the bitmap will
2725 				 * be updated with the bits for this sync.
2726 				 */
2727 				if (!(rdc_get_vflags(utmp) & RDC_LOGGING)) {
2728 					rdc_many_exit(krdc);
2729 					return (utmp);
2730 				}
2731 			}
2732 		}
2733 	}
2734 
2735 	if (kmulti) {
2736 		utmp = &rdc_u_info[kmulti->index];
2737 		ktmp = kmulti;	/* In case we decide we do need to use ktmp */
2738 
2739 		ASSERT(options & RDC_OPT_REVERSE);
2740 
2741 		if (IS_REPLICATING(utmp)) {
2742 			/*
2743 			 * Replicating is bad as data is already flowing to
2744 			 * the target of the requested sync operation.
2745 			 */
2746 			rdc_many_exit(krdc);
2747 			return (utmp);
2748 		}
2749 
2750 		if (rdc_get_vflags(utmp) & RDC_SYNCING) {
2751 			/*
2752 			 * Forward sync in progress is bad, as data is
2753 			 * already flowing to the target of the requested
2754 			 * sync operation.
2755 			 * Reverse sync in progress is bad, as the primary
2756 			 * has already decided which data to copy.
2757 			 */
2758 			rdc_many_exit(krdc);
2759 			return (utmp);
2760 		}
2761 
2762 		/*
2763 		 * Clear the "sync needed" flags, as the multi-hop secondary
2764 		 * will be updated via this requested sync operation, so does
2765 		 * not need to complete its aborted forward sync.
2766 		 */
2767 		if (rdc_get_vflags(utmp) & RDC_SYNC_NEEDED)
2768 			rdc_clr_flags(utmp, RDC_SYNC_NEEDED);
2769 	}
2770 
2771 	if (IS_MANY(krdc) && (options & RDC_OPT_REVERSE)) {
2772 		for (ktmp = krdc->many_next; ktmp != krdc;
2773 		    ktmp = ktmp->many_next) {
2774 			utmp = &rdc_u_info[ktmp->index];
2775 			if (!IS_ENABLED(utmp))
2776 				continue;
2777 
2778 			/*
2779 			 * Clear any "reverse sync needed" flags, as the
2780 			 * volume will be updated via this requested
2781 			 * sync operation, so does not need to complete
2782 			 * its aborted reverse sync.
2783 			 */
2784 			if (rdc_get_mflags(utmp) & RDC_RSYNC_NEEDED)
2785 				rdc_clr_mflags(utmp, RDC_RSYNC_NEEDED);
2786 		}
2787 	}
2788 
2789 	rdc_many_exit(krdc);
2790 
2791 	return (NULL);
2792 }
2793 
2794 static void
2795 _rdc_sync_wrthr(void *thrinfo)
2796 {
2797 	rdc_syncthr_t *syncinfo = (rdc_syncthr_t *)thrinfo;
2798 	nsc_buf_t *handle = NULL;
2799 	rdc_k_info_t *krdc = syncinfo->krdc;
2800 	int rc;
2801 	int tries = 0;
2802 
2803 	DTRACE_PROBE2(rdc_sync_loop_netwrite_start, int, krdc->index,
2804 	    nsc_buf_t *, handle);
2805 
2806 retry:
2807 	rc = nsc_alloc_buf(RDC_U_FD(krdc), syncinfo->offset, syncinfo->len,
2808 	    NSC_READ | NSC_NOCACHE, &handle);
2809 
2810 	if (!RDC_SUCCESS(rc) || krdc->remote_index < 0) {
2811 		DTRACE_PROBE(rdc_sync_wrthr_alloc_buf_err);
2812 		goto failed;
2813 	}
2814 
2815 	rdc_group_enter(krdc);
2816 	if ((krdc->disk_status == 1) || (krdc->dcio_bitmap == NULL)) {
2817 		rdc_group_exit(krdc);
2818 		goto failed;
2819 	}
2820 	rdc_group_exit(krdc);
2821 
2822 	if ((rc = rdc_net_write(krdc->index, krdc->remote_index, handle,
2823 	    handle->sb_pos, handle->sb_len, RDC_NOSEQ, RDC_NOQUE, NULL)) > 0) {
2824 		rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2825 
2826 		/*
2827 		 * The following is to handle
2828 		 * the case where the secondary side
2829 		 * has thrown our buffer handle token away in a
2830 		 * attempt to preserve its health on restart
2831 		 */
2832 		if ((rc == EPROTO) && (tries < 3)) {
2833 			(void) nsc_free_buf(handle);
2834 			handle = NULL;
2835 			tries++;
2836 			delay(HZ >> 2);
2837 			goto retry;
2838 		}
2839 
2840 		DTRACE_PROBE(rdc_sync_wrthr_remote_write_err);
2841 		cmn_err(CE_WARN, "!rdc_sync_wrthr: remote write failed (%d) "
2842 		    "0x%x", rc, rdc_get_vflags(urdc));
2843 
2844 		goto failed;
2845 	}
2846 	(void) nsc_free_buf(handle);
2847 	handle = NULL;
2848 
2849 	return;
2850 failed:
2851 	(void) nsc_free_buf(handle);
2852 	syncinfo->status->offset = syncinfo->offset;
2853 }
2854 
2855 /*
2856  * see above comments on _rdc_sync_wrthr
2857  */
2858 static void
2859 _rdc_sync_rdthr(void *thrinfo)
2860 {
2861 	rdc_syncthr_t *syncinfo = (rdc_syncthr_t *)thrinfo;
2862 	nsc_buf_t *handle = NULL;
2863 	rdc_k_info_t *krdc = syncinfo->krdc;
2864 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2865 	int rc;
2866 
2867 	rc = nsc_alloc_buf(RDC_U_FD(krdc), syncinfo->offset, syncinfo->len,
2868 	    NSC_WRITE | NSC_WRTHRU | NSC_NOCACHE, &handle);
2869 
2870 	if (!RDC_SUCCESS(rc) || krdc->remote_index < 0) {
2871 		goto failed;
2872 	}
2873 	rdc_group_enter(krdc);
2874 	if ((krdc->disk_status == 1) || (krdc->dcio_bitmap == NULL)) {
2875 		rdc_group_exit(krdc);
2876 		goto failed;
2877 	}
2878 	rdc_group_exit(krdc);
2879 
2880 	rc = rdc_net_read(krdc->index, krdc->remote_index, handle,
2881 	    handle->sb_pos, handle->sb_len);
2882 
2883 	if (!RDC_SUCCESS(rc)) {
2884 		cmn_err(CE_WARN, "!rdc_sync_rdthr: remote read failed(%d)", rc);
2885 		goto failed;
2886 	}
2887 	if (!IS_STATE(urdc, RDC_FULL))
2888 		rdc_set_bitmap_many(krdc, handle->sb_pos, handle->sb_len);
2889 
2890 	rc = nsc_write(handle, handle->sb_pos, handle->sb_len, 0);
2891 
2892 	if (!RDC_SUCCESS(rc)) {
2893 		rdc_many_enter(krdc);
2894 		rdc_set_flags_log(urdc, RDC_VOL_FAILED, "nsc_write failed");
2895 		rdc_many_exit(krdc);
2896 		rdc_write_state(urdc);
2897 		goto failed;
2898 	}
2899 
2900 	(void) nsc_free_buf(handle);
2901 	handle = NULL;
2902 
2903 	return;
2904 failed:
2905 	(void) nsc_free_buf(handle);
2906 	syncinfo->status->offset = syncinfo->offset;
2907 }
2908 
2909 /*
2910  * _rdc_sync_wrthr
2911  * sync loop write thread
2912  * if there are avail threads, we have not
2913  * used up the pipe, so the sync loop will, if
2914  * possible use these to multithread the write/read
2915  */
2916 void
2917 _rdc_sync_thread(void *thrinfo)
2918 {
2919 	rdc_syncthr_t *syncinfo = (rdc_syncthr_t *)thrinfo;
2920 	rdc_k_info_t *krdc = syncinfo->krdc;
2921 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2922 	rdc_thrsync_t *sync = &krdc->syncs;
2923 	uint_t bitmask;
2924 	int rc;
2925 
2926 	rc = _rdc_rsrv_devs(krdc, RDC_RAW, RDC_INTERNAL);
2927 	if (!RDC_SUCCESS(rc))
2928 		goto failed;
2929 
2930 	if (IS_STATE(urdc, RDC_SLAVE))
2931 		_rdc_sync_rdthr(thrinfo);
2932 	else
2933 		_rdc_sync_wrthr(thrinfo);
2934 
2935 	_rdc_rlse_devs(krdc, RDC_RAW);
2936 
2937 	if (krdc->dcio_bitmap == NULL) {
2938 #ifdef DEBUG
2939 		cmn_err(CE_NOTE, "!_rdc_sync_wrthr: NULL bitmap");
2940 #else
2941 	/*EMPTY*/
2942 #endif
2943 	} else if (syncinfo->status->offset < 0) {
2944 
2945 		RDC_SET_BITMASK(syncinfo->offset, syncinfo->len, &bitmask);
2946 		RDC_CLR_BITMAP(krdc, syncinfo->offset, syncinfo->len, \
2947 		    bitmask, RDC_BIT_FORCE);
2948 	}
2949 
2950 failed:
2951 	/*
2952 	 * done with this, get rid of it.
2953 	 * the status is not freed, it should still be a status chain
2954 	 * that _rdc_sync() has the head of
2955 	 */
2956 	kmem_free(syncinfo, sizeof (*syncinfo));
2957 
2958 	/*
2959 	 * decrement the global sync thread num
2960 	 */
2961 	mutex_enter(&sync_info.lock);
2962 	sync_info.active_thr--;
2963 	/* LINTED */
2964 	RDC_AVAIL_THR_TUNE(sync_info);
2965 	mutex_exit(&sync_info.lock);
2966 
2967 	/*
2968 	 * krdc specific stuff
2969 	 */
2970 	mutex_enter(&sync->lock);
2971 	sync->complete++;
2972 	cv_broadcast(&sync->cv);
2973 	mutex_exit(&sync->lock);
2974 }
2975 
2976 int
2977 _rdc_setup_syncthr(rdc_syncthr_t **synthr, nsc_off_t offset,
2978     nsc_size_t len, rdc_k_info_t *krdc, sync_status_t *stats)
2979 {
2980 	rdc_syncthr_t *tmp;
2981 	/* alloc here, free in the sync thread */
2982 	tmp =
2983 	    (rdc_syncthr_t *)kmem_zalloc(sizeof (rdc_syncthr_t), KM_NOSLEEP);
2984 
2985 	if (tmp == NULL)
2986 		return (-1);
2987 	tmp->offset = offset;
2988 	tmp->len = len;
2989 	tmp->status = stats;
2990 	tmp->krdc = krdc;
2991 
2992 	*synthr = tmp;
2993 	return (0);
2994 }
2995 
2996 sync_status_t *
2997 _rdc_new_sync_status()
2998 {
2999 	sync_status_t *s;
3000 
3001 	s = (sync_status_t *)kmem_zalloc(sizeof (*s), KM_NOSLEEP);
3002 	s->offset = -1;
3003 	return (s);
3004 }
3005 
3006 void
3007 _rdc_free_sync_status(sync_status_t *status)
3008 {
3009 	sync_status_t *s;
3010 
3011 	while (status) {
3012 		s = status->next;
3013 		kmem_free(status, sizeof (*status));
3014 		status = s;
3015 	}
3016 }
3017 int
3018 _rdc_sync_status_ok(sync_status_t *status, int *offset)
3019 {
3020 #ifdef DEBUG_SYNCSTATUS
3021 	int i = 0;
3022 #endif
3023 	while (status) {
3024 		if (status->offset >= 0) {
3025 			*offset = status->offset;
3026 			return (-1);
3027 		}
3028 		status = status->next;
3029 #ifdef DEBUG_SYNCSTATUS
3030 		i++;
3031 #endif
3032 	}
3033 #ifdef DEBUGSYNCSTATUS
3034 	cmn_err(CE_NOTE, "!rdc_sync_status_ok: checked %d statuses", i);
3035 #endif
3036 	return (0);
3037 }
3038 
3039 int mtsync = 1;
3040 /*
3041  * _rdc_sync() : rdc sync loop
3042  *
3043  */
3044 static void
3045 _rdc_sync(rdc_k_info_t *krdc)
3046 {
3047 	nsc_size_t size = 0;
3048 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
3049 	int rtype;
3050 	int sts;
3051 	int reserved = 0;
3052 	nsc_buf_t *alloc_h = NULL;
3053 	nsc_buf_t *handle = NULL;
3054 	nsc_off_t mask;
3055 	nsc_size_t maxbit;
3056 	nsc_size_t len;
3057 	nsc_off_t offset = 0;
3058 	int sync_completed = 0;
3059 	int tries = 0;
3060 	int rc;
3061 	int queuing = 0;
3062 	uint_t bitmask;
3063 	sync_status_t *ss, *sync_status = NULL;
3064 	rdc_thrsync_t *sync = &krdc->syncs;
3065 	rdc_syncthr_t *syncinfo;
3066 	nsthread_t *trc = NULL;
3067 
3068 	if (IS_STATE(urdc, RDC_QUEUING) && !IS_STATE(urdc, RDC_FULL)) {
3069 		/* flusher is handling the sync in the update case */
3070 		queuing = 1;
3071 		goto sync_done;
3072 	}
3073 
3074 	/*
3075 	 * Main sync/resync loop
3076 	 */
3077 	DTRACE_PROBE(rdc_sync_loop_start);
3078 
3079 	rtype = RDC_RAW;
3080 	sts = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
3081 
3082 	DTRACE_PROBE(rdc_sync_loop_rsrv);
3083 
3084 	if (sts != 0)
3085 		goto failed_noincr;
3086 
3087 	reserved = 1;
3088 
3089 	/*
3090 	 * pre-allocate a handle if we can - speeds up the sync.
3091 	 */
3092 
3093 	if (rdc_prealloc_handle) {
3094 		alloc_h = nsc_alloc_handle(RDC_U_FD(krdc), NULL, NULL, NULL);
3095 #ifdef DEBUG
3096 		if (!alloc_h) {
3097 			cmn_err(CE_WARN,
3098 			    "!rdc sync: failed to pre-alloc handle");
3099 		}
3100 #endif
3101 	} else {
3102 		alloc_h = NULL;
3103 	}
3104 
3105 	ASSERT(urdc->volume_size != 0);
3106 	size = urdc->volume_size;
3107 	mask = ~(LOG_TO_FBA_NUM(1) - 1);
3108 	maxbit = FBA_TO_LOG_NUM(size - 1);
3109 
3110 	/*
3111 	 * as this while loop can also move data, it is counted as a
3112 	 * sync loop thread
3113 	 */
3114 	rdc_group_enter(krdc);
3115 	rdc_clr_flags(urdc, RDC_LOGGING);
3116 	rdc_set_flags(urdc, RDC_SYNCING);
3117 	krdc->group->synccount++;
3118 	rdc_group_exit(krdc);
3119 	mutex_enter(&sync_info.lock);
3120 	sync_info.active_thr++;
3121 	/* LINTED */
3122 	RDC_AVAIL_THR_TUNE(sync_info);
3123 	mutex_exit(&sync_info.lock);
3124 
3125 	while (offset < size) {
3126 		rdc_group_enter(krdc);
3127 		ASSERT(krdc->aux_state & RDC_AUXSYNCIP);
3128 		if (krdc->disk_status == 1 || krdc->dcio_bitmap == NULL) {
3129 			rdc_group_exit(krdc);
3130 			if (krdc->disk_status == 1) {
3131 				DTRACE_PROBE(rdc_sync_loop_disk_status_err);
3132 			} else {
3133 				DTRACE_PROBE(rdc_sync_loop_dcio_bitmap_err);
3134 			}
3135 			goto failed;		/* halt sync */
3136 		}
3137 		rdc_group_exit(krdc);
3138 
3139 		if (!(rdc_get_vflags(urdc) & RDC_FULL)) {
3140 			mutex_enter(&krdc->syncbitmutex);
3141 			krdc->syncbitpos = FBA_TO_LOG_NUM(offset);
3142 			len = 0;
3143 
3144 			/* skip unnecessary chunks */
3145 
3146 			while (krdc->syncbitpos <= maxbit &&
3147 			    !RDC_BIT_ISSET(krdc, krdc->syncbitpos)) {
3148 				offset += LOG_TO_FBA_NUM(1);
3149 				krdc->syncbitpos++;
3150 			}
3151 
3152 			/* check for boundary */
3153 
3154 			if (offset >= size) {
3155 				mutex_exit(&krdc->syncbitmutex);
3156 				goto sync_done;
3157 			}
3158 
3159 			/* find maximal length we can transfer */
3160 
3161 			while (krdc->syncbitpos <= maxbit &&
3162 			    RDC_BIT_ISSET(krdc, krdc->syncbitpos)) {
3163 				len += LOG_TO_FBA_NUM(1);
3164 				krdc->syncbitpos++;
3165 				/* we can only read maxfbas anyways */
3166 				if (len >= krdc->maxfbas)
3167 					break;
3168 			}
3169 
3170 			len = min(len, (size - offset));
3171 
3172 		} else {
3173 			len = size - offset;
3174 		}
3175 
3176 		/* truncate to the io provider limit */
3177 		ASSERT(krdc->maxfbas != 0);
3178 		len = min(len, krdc->maxfbas);
3179 
3180 		if (len > LOG_TO_FBA_NUM(1)) {
3181 			/*
3182 			 * If the update is larger than a bitmap chunk,
3183 			 * then truncate to a whole number of bitmap
3184 			 * chunks.
3185 			 *
3186 			 * If the update is smaller than a bitmap
3187 			 * chunk, this must be the last write.
3188 			 */
3189 			len &= mask;
3190 		}
3191 
3192 		if (!(rdc_get_vflags(urdc) & RDC_FULL)) {
3193 			krdc->syncbitpos = FBA_TO_LOG_NUM(offset + len);
3194 			mutex_exit(&krdc->syncbitmutex);
3195 		}
3196 
3197 		/*
3198 		 * Find out if we can reserve a thread here ...
3199 		 * note: skip the mutex for the first check, if the number
3200 		 * is up there, why bother even grabbing the mutex to
3201 		 * only realize that we can't have a thread anyways
3202 		 */
3203 
3204 		if (mtsync && sync_info.active_thr < RDC_MAX_SYNC_THREADS) {
3205 
3206 			mutex_enter(&sync_info.lock);
3207 			if (sync_info.avail_thr >= 1) {
3208 				if (sync_status == NULL) {
3209 					ss = sync_status =
3210 					    _rdc_new_sync_status();
3211 				} else {
3212 					ss = ss->next = _rdc_new_sync_status();
3213 				}
3214 				if (ss == NULL) {
3215 					mutex_exit(&sync_info.lock);
3216 #ifdef DEBUG
3217 					cmn_err(CE_WARN, "!rdc_sync: can't "
3218 					    "allocate status for mt sync");
3219 #endif
3220 					goto retry;
3221 				}
3222 				/*
3223 				 * syncinfo protected by sync_info lock but
3224 				 * not part of the sync_info structure
3225 				 * be careful if moving
3226 				 */
3227 				if (_rdc_setup_syncthr(&syncinfo,
3228 				    offset, len, krdc, ss) < 0) {
3229 					_rdc_free_sync_status(ss);
3230 				}
3231 
3232 				trc = nst_create(sync_info.rdc_syncset,
3233 				    _rdc_sync_thread, syncinfo, NST_SLEEP);
3234 
3235 				if (trc == NULL) {
3236 					mutex_exit(&sync_info.lock);
3237 #ifdef DEBUG
3238 					cmn_err(CE_NOTE, "!rdc_sync: unable to "
3239 					    "mt sync");
3240 #endif
3241 					_rdc_free_sync_status(ss);
3242 					kmem_free(syncinfo, sizeof (*syncinfo));
3243 					syncinfo = NULL;
3244 					goto retry;
3245 				} else {
3246 					mutex_enter(&sync->lock);
3247 					sync->threads++;
3248 					mutex_exit(&sync->lock);
3249 				}
3250 
3251 				sync_info.active_thr++;
3252 				/* LINTED */
3253 				RDC_AVAIL_THR_TUNE(sync_info);
3254 
3255 				mutex_exit(&sync_info.lock);
3256 				goto threaded;
3257 			}
3258 			mutex_exit(&sync_info.lock);
3259 		}
3260 retry:
3261 		handle = alloc_h;
3262 		DTRACE_PROBE(rdc_sync_loop_allocbuf_start);
3263 		if (rdc_get_vflags(urdc) & RDC_SLAVE)
3264 			sts = nsc_alloc_buf(RDC_U_FD(krdc), offset, len,
3265 			    NSC_WRITE | NSC_WRTHRU | NSC_NOCACHE, &handle);
3266 		else
3267 			sts = nsc_alloc_buf(RDC_U_FD(krdc), offset, len,
3268 			    NSC_READ | NSC_NOCACHE, &handle);
3269 
3270 		DTRACE_PROBE(rdc_sync_loop_allocbuf_end);
3271 		if (sts > 0) {
3272 			if (handle && handle != alloc_h) {
3273 				(void) nsc_free_buf(handle);
3274 			}
3275 
3276 			handle = NULL;
3277 			DTRACE_PROBE(rdc_sync_loop_allocbuf_err);
3278 			goto failed;
3279 		}
3280 
3281 		if (rdc_get_vflags(urdc) & RDC_SLAVE) {
3282 			/* overwrite buffer with remote data */
3283 			sts = rdc_net_read(krdc->index, krdc->remote_index,
3284 			    handle, handle->sb_pos, handle->sb_len);
3285 
3286 			if (!RDC_SUCCESS(sts)) {
3287 #ifdef DEBUG
3288 				cmn_err(CE_WARN,
3289 				    "!rdc sync: remote read failed (%d)", sts);
3290 #endif
3291 				DTRACE_PROBE(rdc_sync_loop_remote_read_err);
3292 				goto failed;
3293 			}
3294 			if (!(rdc_get_vflags(urdc) & RDC_FULL))
3295 				rdc_set_bitmap_many(krdc, handle->sb_pos,
3296 				    handle->sb_len);
3297 
3298 			/* commit locally */
3299 
3300 			sts = nsc_write(handle, handle->sb_pos,
3301 			    handle->sb_len, 0);
3302 
3303 			if (!RDC_SUCCESS(sts)) {
3304 				/* reverse sync needed already set */
3305 				rdc_many_enter(krdc);
3306 				rdc_set_flags_log(urdc, RDC_VOL_FAILED,
3307 				    "write failed during sync");
3308 				rdc_many_exit(krdc);
3309 				rdc_write_state(urdc);
3310 				DTRACE_PROBE(rdc_sync_loop_nsc_write_err);
3311 				goto failed;
3312 			}
3313 		} else {
3314 			/* send local data to remote */
3315 			DTRACE_PROBE2(rdc_sync_loop_netwrite_start,
3316 			    int, krdc->index, nsc_buf_t *, handle);
3317 
3318 			if ((sts = rdc_net_write(krdc->index,
3319 			    krdc->remote_index, handle, handle->sb_pos,
3320 			    handle->sb_len, RDC_NOSEQ, RDC_NOQUE, NULL)) > 0) {
3321 
3322 				/*
3323 				 * The following is to handle
3324 				 * the case where the secondary side
3325 				 * has thrown our buffer handle token away in a
3326 				 * attempt to preserve its health on restart
3327 				 */
3328 				if ((sts == EPROTO) && (tries < 3)) {
3329 					(void) nsc_free_buf(handle);
3330 					handle = NULL;
3331 					tries++;
3332 					delay(HZ >> 2);
3333 					goto retry;
3334 				}
3335 #ifdef DEBUG
3336 				cmn_err(CE_WARN,
3337 				    "!rdc sync: remote write failed (%d) 0x%x",
3338 				    sts, rdc_get_vflags(urdc));
3339 #endif
3340 				DTRACE_PROBE(rdc_sync_loop_netwrite_err);
3341 				goto failed;
3342 			}
3343 			DTRACE_PROBE(rdc_sync_loop_netwrite_end);
3344 		}
3345 
3346 		(void) nsc_free_buf(handle);
3347 		handle = NULL;
3348 
3349 		if (krdc->dcio_bitmap == NULL) {
3350 #ifdef DEBUG
3351 			cmn_err(CE_NOTE, "!_rdc_sync: NULL bitmap");
3352 #else
3353 		;
3354 		/*EMPTY*/
3355 #endif
3356 		} else {
3357 
3358 			RDC_SET_BITMASK(offset, len, &bitmask);
3359 			RDC_CLR_BITMAP(krdc, offset, len, bitmask, \
3360 			    RDC_BIT_FORCE);
3361 			ASSERT(!IS_ASYNC(urdc));
3362 		}
3363 
3364 		/*
3365 		 * Only release/reserve if someone is waiting
3366 		 */
3367 		if (krdc->devices->id_release || nsc_waiting(RDC_U_FD(krdc))) {
3368 			DTRACE_PROBE(rdc_sync_loop_rlse_start);
3369 			if (alloc_h) {
3370 				(void) nsc_free_handle(alloc_h);
3371 				alloc_h = NULL;
3372 			}
3373 
3374 			_rdc_rlse_devs(krdc, rtype);
3375 			reserved = 0;
3376 			delay(2);
3377 
3378 			rtype = RDC_RAW;
3379 			sts = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
3380 			if (sts != 0) {
3381 				handle = NULL;
3382 				DTRACE_PROBE(rdc_sync_loop_rdc_rsrv_err);
3383 				goto failed;
3384 			}
3385 
3386 			reserved = 1;
3387 
3388 			if (rdc_prealloc_handle) {
3389 				alloc_h = nsc_alloc_handle(RDC_U_FD(krdc),
3390 				    NULL, NULL, NULL);
3391 #ifdef DEBUG
3392 				if (!alloc_h) {
3393 					cmn_err(CE_WARN, "!rdc_sync: "
3394 					    "failed to pre-alloc handle");
3395 				}
3396 #endif
3397 			}
3398 			DTRACE_PROBE(rdc_sync_loop_rlse_end);
3399 		}
3400 threaded:
3401 		offset += len;
3402 		urdc->sync_pos = offset;
3403 	}
3404 
3405 sync_done:
3406 	sync_completed = 1;
3407 
3408 failed:
3409 	krdc->group->synccount--;
3410 failed_noincr:
3411 	mutex_enter(&sync->lock);
3412 	while (sync->complete != sync->threads) {
3413 		cv_wait(&sync->cv, &sync->lock);
3414 	}
3415 	sync->complete = 0;
3416 	sync->threads = 0;
3417 	mutex_exit(&sync->lock);
3418 
3419 	/*
3420 	 * if sync_completed is 0 here,
3421 	 * we know that the main sync thread failed anyway
3422 	 * so just free the statuses and fail
3423 	 */
3424 	if (sync_completed && (_rdc_sync_status_ok(sync_status, &rc) < 0)) {
3425 		urdc->sync_pos = rc;
3426 		sync_completed = 0; /* at least 1 thread failed */
3427 	}
3428 
3429 	_rdc_free_sync_status(sync_status);
3430 
3431 	/*
3432 	 * we didn't increment, we didn't even sync,
3433 	 * so don't dec sync_info.active_thr
3434 	 */
3435 	if (!queuing) {
3436 		mutex_enter(&sync_info.lock);
3437 		sync_info.active_thr--;
3438 		/* LINTED */
3439 		RDC_AVAIL_THR_TUNE(sync_info);
3440 		mutex_exit(&sync_info.lock);
3441 	}
3442 
3443 	if (handle) {
3444 		(void) nsc_free_buf(handle);
3445 	}
3446 
3447 	if (alloc_h) {
3448 		(void) nsc_free_handle(alloc_h);
3449 	}
3450 
3451 	if (reserved) {
3452 		_rdc_rlse_devs(krdc, rtype);
3453 	}
3454 
3455 notstarted:
3456 	rdc_group_enter(krdc);
3457 	ASSERT(krdc->aux_state & RDC_AUXSYNCIP);
3458 	if (IS_STATE(urdc, RDC_QUEUING))
3459 		rdc_clr_flags(urdc, RDC_QUEUING);
3460 
3461 	if (sync_completed) {
3462 		(void) rdc_net_state(krdc->index, CCIO_DONE);
3463 	} else {
3464 		(void) rdc_net_state(krdc->index, CCIO_ENABLELOG);
3465 	}
3466 
3467 	rdc_clr_flags(urdc, RDC_SYNCING);
3468 	if (rdc_get_vflags(urdc) & RDC_SLAVE) {
3469 		rdc_many_enter(krdc);
3470 		rdc_clr_mflags(urdc, RDC_SLAVE);
3471 		rdc_many_exit(krdc);
3472 	}
3473 	if (krdc->type_flag & RDC_ASYNCMODE)
3474 		rdc_set_flags(urdc, RDC_ASYNC);
3475 	if (sync_completed) {
3476 		rdc_many_enter(krdc);
3477 		rdc_clr_mflags(urdc, RDC_RSYNC_NEEDED);
3478 		rdc_many_exit(krdc);
3479 	} else {
3480 		krdc->remote_index = -1;
3481 		rdc_set_flags_log(urdc, RDC_LOGGING, "sync failed to complete");
3482 	}
3483 	rdc_group_exit(krdc);
3484 	rdc_write_state(urdc);
3485 
3486 	mutex_enter(&net_blk_lock);
3487 	if (sync_completed)
3488 		krdc->sync_done = RDC_COMPLETED;
3489 	else
3490 		krdc->sync_done = RDC_FAILED;
3491 	cv_broadcast(&krdc->synccv);
3492 	mutex_exit(&net_blk_lock);
3493 
3494 }
3495 
3496 
3497 static int
3498 rdc_sync(rdc_config_t *uparms, spcs_s_info_t kstatus)
3499 {
3500 	rdc_set_t *rdc_set = uparms->rdc_set;
3501 	int options = uparms->options;
3502 	int rc = 0;
3503 	int busy = 0;
3504 	int index;
3505 	rdc_k_info_t *krdc;
3506 	rdc_u_info_t *urdc;
3507 	rdc_k_info_t *kmulti;
3508 	rdc_u_info_t *umulti;
3509 	rdc_group_t *group;
3510 	rdc_srv_t *svp;
3511 	int sm, um, md;
3512 	int sync_completed = 0;
3513 	int thrcount;
3514 
3515 	mutex_enter(&rdc_conf_lock);
3516 	index = rdc_lookup_byname(rdc_set);
3517 	if (index >= 0)
3518 		krdc = &rdc_k_info[index];
3519 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
3520 		mutex_exit(&rdc_conf_lock);
3521 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3522 		    rdc_set->secondary.file);
3523 		rc = RDC_EALREADY;
3524 		goto notstarted;
3525 	}
3526 
3527 	urdc = &rdc_u_info[index];
3528 	group = krdc->group;
3529 	set_busy(krdc);
3530 	busy = 1;
3531 	if ((krdc->type_flag == 0) || (krdc->type_flag & RDC_DISABLEPEND)) {
3532 		/* A resume or enable failed  or we raced with a teardown */
3533 		mutex_exit(&rdc_conf_lock);
3534 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3535 		    rdc_set->secondary.file);
3536 		rc = RDC_EALREADY;
3537 		goto notstarted;
3538 	}
3539 	mutex_exit(&rdc_conf_lock);
3540 	rdc_group_enter(krdc);
3541 
3542 	if (!IS_STATE(urdc, RDC_LOGGING)) {
3543 		spcs_s_add(kstatus, RDC_ESETNOTLOGGING, urdc->secondary.intf,
3544 		    urdc->secondary.file);
3545 		rc = RDC_ENOTLOGGING;
3546 		goto notstarted_unlock;
3547 	}
3548 
3549 	if (rdc_check(krdc, rdc_set)) {
3550 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3551 		    rdc_set->secondary.file);
3552 		rc = RDC_EALREADY;
3553 		goto notstarted_unlock;
3554 	}
3555 
3556 	if (!(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
3557 		spcs_s_add(kstatus, RDC_ENOTPRIMARY, rdc_set->primary.intf,
3558 		    rdc_set->primary.file, rdc_set->secondary.intf,
3559 		    rdc_set->secondary.file);
3560 		rc = RDC_ENOTPRIMARY;
3561 		goto notstarted_unlock;
3562 	}
3563 
3564 	if ((options & RDC_OPT_REVERSE) && (IS_STATE(urdc, RDC_QUEUING))) {
3565 		/*
3566 		 * cannot reverse sync when queuing, need to go logging first
3567 		 */
3568 		spcs_s_add(kstatus, RDC_EQNORSYNC, rdc_set->primary.intf,
3569 		    rdc_set->primary.file, rdc_set->secondary.intf,
3570 		    rdc_set->secondary.file);
3571 		rc = RDC_EQNORSYNC;
3572 		goto notstarted_unlock;
3573 	}
3574 
3575 	svp = krdc->lsrv;
3576 	krdc->intf = rdc_add_to_if(svp, &(urdc->primary.addr),
3577 	    &(urdc->secondary.addr), 1);
3578 
3579 	if (!krdc->intf) {
3580 		spcs_s_add(kstatus, RDC_EADDTOIF, urdc->primary.intf,
3581 		    urdc->secondary.intf);
3582 		rc = RDC_EADDTOIF;
3583 		goto notstarted_unlock;
3584 	}
3585 
3586 	if (urdc->volume_size == 0) {
3587 		/* Implies reserve failed when previous resume was done */
3588 		rdc_get_details(krdc);
3589 	}
3590 	if (urdc->volume_size == 0) {
3591 		spcs_s_add(kstatus, RDC_ENOBMAP);
3592 		rc = RDC_ENOBMAP;
3593 		goto notstarted_unlock;
3594 	}
3595 
3596 	if (krdc->dcio_bitmap == NULL) {
3597 		if (rdc_resume_bitmap(krdc) < 0) {
3598 			spcs_s_add(kstatus, RDC_ENOBMAP);
3599 			rc = RDC_ENOBMAP;
3600 			goto notstarted_unlock;
3601 		}
3602 	}
3603 
3604 	if ((rdc_get_vflags(urdc) & RDC_BMP_FAILED) && (krdc->bitmapfd)) {
3605 		if (rdc_reset_bitmap(krdc)) {
3606 			spcs_s_add(kstatus, RDC_EBITMAP);
3607 			rc = RDC_EBITMAP;
3608 			goto notstarted_unlock;
3609 		}
3610 	}
3611 
3612 	if (IS_MANY(krdc) || IS_MULTI(krdc)) {
3613 		rdc_u_info_t *ubad;
3614 
3615 		if ((ubad = rdc_allow_pri_sync(urdc, options)) != NULL) {
3616 			spcs_s_add(kstatus, RDC_ESTATE,
3617 			    ubad->primary.intf, ubad->primary.file,
3618 			    ubad->secondary.intf, ubad->secondary.file);
3619 			rc = RDC_ESTATE;
3620 			goto notstarted_unlock;
3621 		}
3622 	}
3623 
3624 	/*
3625 	 * there is a small window where _rdc_sync is still
3626 	 * running, but has cleared the RDC_SYNCING flag.
3627 	 * Use aux_state which is only cleared
3628 	 * after _rdc_sync had done its 'death' broadcast.
3629 	 */
3630 	if (krdc->aux_state & RDC_AUXSYNCIP) {
3631 #ifdef DEBUG
3632 		if (!rdc_get_vflags(urdc) & RDC_SYNCING) {
3633 			cmn_err(CE_WARN, "!rdc_sync: "
3634 			    "RDC_AUXSYNCIP set, SYNCING off");
3635 		}
3636 #endif
3637 		spcs_s_add(kstatus, RDC_ESYNCING, rdc_set->primary.file);
3638 		rc = RDC_ESYNCING;
3639 		goto notstarted_unlock;
3640 	}
3641 	if (krdc->disk_status == 1) {
3642 		spcs_s_add(kstatus, RDC_ESYNCING, rdc_set->primary.file);
3643 		rc = RDC_ESYNCING;
3644 		goto notstarted_unlock;
3645 	}
3646 
3647 	if ((options & RDC_OPT_FORWARD) &&
3648 	    (rdc_get_mflags(urdc) & RDC_RSYNC_NEEDED)) {
3649 		/* cannot forward sync if a reverse sync is needed */
3650 		spcs_s_add(kstatus, RDC_ERSYNCNEEDED, rdc_set->primary.intf,
3651 		    rdc_set->primary.file, rdc_set->secondary.intf,
3652 		    rdc_set->secondary.file);
3653 		rc = RDC_ERSYNCNEEDED;
3654 		goto notstarted_unlock;
3655 	}
3656 
3657 	urdc->sync_pos = 0;
3658 
3659 	/* Check if the rdc set is accessible on the remote node */
3660 	if (rdc_net_getstate(krdc, &sm, &um, &md, FALSE) < 0) {
3661 		/*
3662 		 * Remote end may be inaccessible, or the rdc set is not
3663 		 * enabled at the remote end.
3664 		 */
3665 		spcs_s_add(kstatus, RDC_ECONNOPEN, urdc->secondary.intf,
3666 		    urdc->secondary.file);
3667 		rc = RDC_ECONNOPEN;
3668 		goto notstarted_unlock;
3669 	}
3670 	if (options & RDC_OPT_REVERSE)
3671 		krdc->remote_index = rdc_net_state(index, CCIO_RSYNC);
3672 	else
3673 		krdc->remote_index = rdc_net_state(index, CCIO_SLAVE);
3674 	if (krdc->remote_index < 0) {
3675 		/*
3676 		 * Remote note probably not in a valid state to be synced,
3677 		 * as the state was fetched OK above.
3678 		 */
3679 		spcs_s_add(kstatus, RDC_ERSTATE, urdc->secondary.intf,
3680 		    urdc->secondary.file, urdc->primary.intf,
3681 		    urdc->primary.file);
3682 		rc = RDC_ERSTATE;
3683 		goto notstarted_unlock;
3684 	}
3685 
3686 	rc = check_filesize(index, kstatus);
3687 	if (rc != 0) {
3688 		(void) rdc_net_state(krdc->index, CCIO_ENABLELOG);
3689 		goto notstarted_unlock;
3690 	}
3691 
3692 	krdc->sync_done = 0;
3693 
3694 	mutex_enter(&krdc->bmapmutex);
3695 	krdc->aux_state |= RDC_AUXSYNCIP;
3696 	mutex_exit(&krdc->bmapmutex);
3697 
3698 	if (options & RDC_OPT_REVERSE) {
3699 		rdc_many_enter(krdc);
3700 		rdc_set_mflags(urdc, RDC_SLAVE | RDC_RSYNC_NEEDED);
3701 		mutex_enter(&krdc->bmapmutex);
3702 		rdc_clr_flags(urdc, RDC_VOL_FAILED);
3703 		mutex_exit(&krdc->bmapmutex);
3704 		rdc_write_state(urdc);
3705 		/* LINTED */
3706 		if (kmulti = krdc->multi_next) {
3707 			umulti = &rdc_u_info[kmulti->index];
3708 			if (IS_ENABLED(umulti) && (rdc_get_vflags(umulti) &
3709 			    (RDC_VOL_FAILED | RDC_SYNC_NEEDED))) {
3710 				rdc_clr_flags(umulti, RDC_SYNC_NEEDED);
3711 				rdc_clr_flags(umulti, RDC_VOL_FAILED);
3712 				rdc_write_state(umulti);
3713 			}
3714 		}
3715 		rdc_many_exit(krdc);
3716 	} else {
3717 		rdc_clr_flags(urdc, RDC_FCAL_FAILED);
3718 		rdc_write_state(urdc);
3719 	}
3720 
3721 	if (options & RDC_OPT_UPDATE) {
3722 		ASSERT(urdc->volume_size != 0);
3723 		if (rdc_net_getbmap(index,
3724 		    BMAP_LOG_BYTES(urdc->volume_size)) > 0) {
3725 			spcs_s_add(kstatus, RDC_ENOBMAP);
3726 			rc = RDC_ENOBMAP;
3727 
3728 			(void) rdc_net_state(index, CCIO_ENABLELOG);
3729 
3730 			rdc_clr_flags(urdc, RDC_SYNCING);
3731 			if (options & RDC_OPT_REVERSE) {
3732 				rdc_many_enter(krdc);
3733 				rdc_clr_mflags(urdc, RDC_SLAVE);
3734 				rdc_many_exit(krdc);
3735 			}
3736 			if (krdc->type_flag & RDC_ASYNCMODE)
3737 				rdc_set_flags(urdc, RDC_ASYNC);
3738 			krdc->remote_index = -1;
3739 			rdc_set_flags_log(urdc, RDC_LOGGING,
3740 			    "failed to read remote bitmap");
3741 			rdc_write_state(urdc);
3742 			goto failed;
3743 		}
3744 		rdc_clr_flags(urdc, RDC_FULL);
3745 	} else {
3746 		/*
3747 		 * This is a full sync (not an update sync), mark the
3748 		 * entire bitmap dirty
3749 		 */
3750 		(void) RDC_FILL_BITMAP(krdc, FALSE);
3751 
3752 		rdc_set_flags(urdc, RDC_FULL);
3753 	}
3754 
3755 	rdc_group_exit(krdc);
3756 
3757 	/*
3758 	 * allow diskq->memq flusher to wake up
3759 	 */
3760 	mutex_enter(&krdc->group->ra_queue.net_qlock);
3761 	krdc->group->ra_queue.qfflags &= ~RDC_QFILLSLEEP;
3762 	mutex_exit(&krdc->group->ra_queue.net_qlock);
3763 
3764 	/*
3765 	 * if this is a full sync on a non-diskq set or
3766 	 * a diskq set that has failed, clear the async flag
3767 	 */
3768 	if (krdc->type_flag & RDC_ASYNCMODE) {
3769 		if ((!(options & RDC_OPT_UPDATE)) ||
3770 		    (!RDC_IS_DISKQ(krdc->group)) ||
3771 		    (!(IS_STATE(urdc, RDC_QUEUING)))) {
3772 			/* full syncs, or core queue are synchronous */
3773 			rdc_group_enter(krdc);
3774 			rdc_clr_flags(urdc, RDC_ASYNC);
3775 			rdc_group_exit(krdc);
3776 		}
3777 
3778 		/*
3779 		 * if the queue failed because it was full, lets see
3780 		 * if we can restart it. After _rdc_sync() is done
3781 		 * the modes will switch and we will begin disk
3782 		 * queuing again. NOTE: this should only be called
3783 		 * once per group, as it clears state for all group
3784 		 * members, also clears the async flag for all members
3785 		 */
3786 		if (IS_STATE(urdc, RDC_DISKQ_FAILED)) {
3787 			rdc_unfail_diskq(krdc);
3788 		} else {
3789 		/* don't add insult to injury by flushing a dead queue */
3790 
3791 			/*
3792 			 * if we are updating, and a diskq and
3793 			 * the async thread isn't active, start
3794 			 * it up.
3795 			 */
3796 			if ((options & RDC_OPT_UPDATE) &&
3797 			    (IS_STATE(urdc, RDC_QUEUING))) {
3798 				rdc_group_enter(krdc);
3799 				rdc_clr_flags(urdc, RDC_SYNCING);
3800 				rdc_group_exit(krdc);
3801 				mutex_enter(&krdc->group->ra_queue.net_qlock);
3802 				if (krdc->group->ra_queue.qfill_sleeping ==
3803 				    RDC_QFILL_ASLEEP)
3804 					cv_broadcast(&group->ra_queue.qfcv);
3805 				mutex_exit(&krdc->group->ra_queue.net_qlock);
3806 				thrcount = urdc->asyncthr;
3807 				while ((thrcount-- > 0) &&
3808 				    !krdc->group->rdc_writer) {
3809 					(void) rdc_writer(krdc->index);
3810 				}
3811 			}
3812 		}
3813 	}
3814 
3815 	/*
3816 	 * For a reverse sync, merge the current bitmap with all other sets
3817 	 * that share this volume.
3818 	 */
3819 	if (options & RDC_OPT_REVERSE) {
3820 retry_many:
3821 		rdc_many_enter(krdc);
3822 		if (IS_MANY(krdc)) {
3823 			rdc_k_info_t *kmany;
3824 			rdc_u_info_t *umany;
3825 
3826 			for (kmany = krdc->many_next; kmany != krdc;
3827 			    kmany = kmany->many_next) {
3828 				umany = &rdc_u_info[kmany->index];
3829 				if (!IS_ENABLED(umany))
3830 					continue;
3831 				ASSERT(umany->flags & RDC_PRIMARY);
3832 
3833 				if (!mutex_tryenter(&kmany->group->lock)) {
3834 					rdc_many_exit(krdc);
3835 					/* May merge more than once */
3836 					goto retry_many;
3837 				}
3838 				rdc_merge_bitmaps(krdc, kmany);
3839 				mutex_exit(&kmany->group->lock);
3840 			}
3841 		}
3842 		rdc_many_exit(krdc);
3843 
3844 retry_multi:
3845 		rdc_many_enter(krdc);
3846 		if (IS_MULTI(krdc)) {
3847 			rdc_k_info_t *kmulti = krdc->multi_next;
3848 			rdc_u_info_t *umulti = &rdc_u_info[kmulti->index];
3849 
3850 			if (IS_ENABLED(umulti)) {
3851 				ASSERT(!(umulti->flags & RDC_PRIMARY));
3852 
3853 				if (!mutex_tryenter(&kmulti->group->lock)) {
3854 					rdc_many_exit(krdc);
3855 					goto retry_multi;
3856 				}
3857 				rdc_merge_bitmaps(krdc, kmulti);
3858 				mutex_exit(&kmulti->group->lock);
3859 			}
3860 		}
3861 		rdc_many_exit(krdc);
3862 	}
3863 
3864 	rdc_group_enter(krdc);
3865 
3866 	if (krdc->bitmap_write == 0) {
3867 		if (rdc_write_bitmap_fill(krdc) >= 0)
3868 			krdc->bitmap_write = -1;
3869 	}
3870 
3871 	if (krdc->bitmap_write > 0)
3872 		(void) rdc_write_bitmap(krdc);
3873 
3874 	urdc->bits_set = RDC_COUNT_BITMAP(krdc);
3875 
3876 	rdc_group_exit(krdc);
3877 
3878 	if (options & RDC_OPT_REVERSE) {
3879 		(void) _rdc_sync_event_notify(RDC_SYNC_START,
3880 		    urdc->primary.file, urdc->group_name);
3881 	}
3882 
3883 	/* Now set off the sync itself */
3884 
3885 	mutex_enter(&net_blk_lock);
3886 	if (nsc_create_process(
3887 	    (void (*)(void *))_rdc_sync, (void *)krdc, FALSE)) {
3888 		mutex_exit(&net_blk_lock);
3889 		spcs_s_add(kstatus, RDC_ENOPROC);
3890 		/*
3891 		 * We used to just return here,
3892 		 * but we need to clear the AUXSYNCIP bit
3893 		 * and there is a very small chance that
3894 		 * someone may be waiting on the disk_status flag.
3895 		 */
3896 		rc = RDC_ENOPROC;
3897 		/*
3898 		 * need the group lock held at failed.
3899 		 */
3900 		rdc_group_enter(krdc);
3901 		goto failed;
3902 	}
3903 
3904 	mutex_enter(&rdc_conf_lock);
3905 	wakeup_busy(krdc);
3906 	busy = 0;
3907 	mutex_exit(&rdc_conf_lock);
3908 
3909 	while (krdc->sync_done == 0)
3910 		cv_wait(&krdc->synccv, &net_blk_lock);
3911 	mutex_exit(&net_blk_lock);
3912 
3913 	rdc_group_enter(krdc);
3914 
3915 	if (krdc->sync_done == RDC_FAILED) {
3916 		char siztmp1[16];
3917 		(void) spcs_s_inttostring(
3918 		    urdc->sync_pos, siztmp1, sizeof (siztmp1),
3919 		    0);
3920 		spcs_s_add(kstatus, RDC_EFAIL, siztmp1);
3921 		rc = RDC_EFAIL;
3922 	} else
3923 		sync_completed = 1;
3924 
3925 failed:
3926 	/*
3927 	 * We use this flag now to make halt_sync() wait for
3928 	 * us to terminate and let us take the group lock.
3929 	 */
3930 	krdc->aux_state &= ~RDC_AUXSYNCIP;
3931 	if (krdc->disk_status == 1) {
3932 		krdc->disk_status = 0;
3933 		cv_broadcast(&krdc->haltcv);
3934 	}
3935 
3936 notstarted_unlock:
3937 	rdc_group_exit(krdc);
3938 
3939 	if (sync_completed && (options & RDC_OPT_REVERSE)) {
3940 		(void) _rdc_sync_event_notify(RDC_SYNC_DONE,
3941 		    urdc->primary.file, urdc->group_name);
3942 	}
3943 
3944 notstarted:
3945 	if (busy) {
3946 		mutex_enter(&rdc_conf_lock);
3947 		wakeup_busy(krdc);
3948 		mutex_exit(&rdc_conf_lock);
3949 	}
3950 
3951 	return (rc);
3952 }
3953 
3954 /* ARGSUSED */
3955 static int
3956 _rdc_suspend(rdc_k_info_t *krdc, rdc_set_t *rdc_set, spcs_s_info_t kstatus)
3957 {
3958 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
3959 	rdc_if_t *ip;
3960 	int index = krdc->index;
3961 
3962 	ASSERT(krdc->group != NULL);
3963 	rdc_group_enter(krdc);
3964 #ifdef DEBUG
3965 	ASSERT(rdc_check(krdc, rdc_set) == 0);
3966 #else
3967 	if (rdc_check(krdc, rdc_set)) {
3968 		rdc_group_exit(krdc);
3969 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3970 		    rdc_set->secondary.file);
3971 		return (RDC_EALREADY);
3972 	}
3973 #endif
3974 
3975 	if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
3976 		halt_sync(krdc);
3977 		ASSERT(IS_ENABLED(urdc));
3978 	}
3979 
3980 	rdc_group_exit(krdc);
3981 	(void) rdc_unintercept(krdc);
3982 
3983 #ifdef DEBUG
3984 	cmn_err(CE_NOTE, "!SNDR: suspended %s %s", urdc->primary.file,
3985 	    urdc->secondary.file);
3986 #endif
3987 
3988 	/* Configured but not enabled */
3989 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
3990 
3991 
3992 	if (IS_ASYNC(urdc) && !RDC_IS_DISKQ(krdc->group)) {
3993 		int tries = 2; /* in case of possibly stuck flusher threads */
3994 #ifdef DEBUG
3995 		net_queue *qp = &krdc->group->ra_queue;
3996 #endif
3997 		do {
3998 			if (!krdc->group->rdc_writer)
3999 				(void) rdc_writer(krdc->index);
4000 
4001 			(void) rdc_drain_queue(krdc->index);
4002 
4003 		} while (krdc->group->rdc_writer && tries--);
4004 
4005 		/* ok, force it to happen... */
4006 		if (rdc_drain_queue(krdc->index) != 0) {
4007 			do {
4008 				mutex_enter(&krdc->group->ra_queue.net_qlock);
4009 				krdc->group->asyncdis = 1;
4010 				cv_broadcast(&krdc->group->asyncqcv);
4011 				mutex_exit(&krdc->group->ra_queue.net_qlock);
4012 				cmn_err(CE_WARN,
4013 				    "!SNDR: async I/O pending and not flushed "
4014 				    "for %s during suspend",
4015 				    urdc->primary.file);
4016 #ifdef DEBUG
4017 				cmn_err(CE_WARN,
4018 				    "!nitems: %" NSC_SZFMT " nblocks: %"
4019 				    NSC_SZFMT " head: 0x%p tail: 0x%p",
4020 				    qp->nitems, qp->blocks,
4021 				    (void *)qp->net_qhead,
4022 				    (void *)qp->net_qtail);
4023 #endif
4024 			} while (krdc->group->rdc_thrnum > 0);
4025 		}
4026 	}
4027 
4028 	mutex_enter(&rdc_conf_lock);
4029 	ip = krdc->intf;
4030 	krdc->intf = 0;
4031 
4032 	if (ip) {
4033 		rdc_remove_from_if(ip);
4034 	}
4035 
4036 	mutex_exit(&rdc_conf_lock);
4037 
4038 	rdc_group_enter(krdc);
4039 
4040 	/* Configured but not enabled */
4041 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4042 
4043 	rdc_group_exit(krdc);
4044 	/* Must not hold group lock during this function */
4045 	while (rdc_dump_alloc_bufs_cd(krdc->index) == EAGAIN)
4046 		delay(2);
4047 	rdc_group_enter(krdc);
4048 
4049 	/* Don't rdc_clear_state, unlike _rdc_disable */
4050 
4051 	rdc_free_bitmap(krdc, RDC_CMD_SUSPEND);
4052 	rdc_close_bitmap(krdc);
4053 
4054 	rdc_dev_close(krdc);
4055 	rdc_close_direct(krdc);
4056 
4057 	/* Configured but not enabled */
4058 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4059 
4060 	rdc_group_exit(krdc);
4061 
4062 	/*
4063 	 * we should now unregister the queue, with no conflicting
4064 	 * locks held. This is the last(only) member of the group
4065 	 */
4066 	if (krdc->group && RDC_IS_DISKQ(krdc->group) &&
4067 	    krdc->group->count == 1) { /* stop protecting queue */
4068 		rdc_unintercept_diskq(krdc->group);
4069 	}
4070 
4071 	mutex_enter(&rdc_conf_lock);
4072 
4073 	/* Configured but not enabled */
4074 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4075 
4076 	wait_busy(krdc);
4077 
4078 	if (IS_MANY(krdc) || IS_MULTI(krdc))
4079 		remove_from_many(krdc);
4080 
4081 	remove_from_group(krdc);
4082 
4083 	krdc->remote_index = -1;
4084 	ASSERT(krdc->type_flag & RDC_CONFIGURED);
4085 	ASSERT(krdc->type_flag & RDC_DISABLEPEND);
4086 	krdc->type_flag = 0;
4087 #ifdef	DEBUG
4088 	if (krdc->dcio_bitmap)
4089 		cmn_err(CE_WARN, "!_rdc_suspend: possible mem leak, "
4090 		    "dcio_bitmap");
4091 #endif
4092 	krdc->dcio_bitmap = NULL;
4093 	krdc->bitmap_ref = NULL;
4094 	krdc->bitmap_size = 0;
4095 	krdc->maxfbas = 0;
4096 	krdc->bitmap_write = 0;
4097 	krdc->disk_status = 0;
4098 	rdc_destroy_svinfo(krdc->lsrv);
4099 	krdc->lsrv = NULL;
4100 	krdc->multi_next = NULL;
4101 
4102 	rdc_u_init(urdc);
4103 
4104 	mutex_exit(&rdc_conf_lock);
4105 	rdc_kstat_delete(index);
4106 	return (0);
4107 }
4108 
4109 static int
4110 rdc_suspend(rdc_config_t *uparms, spcs_s_info_t kstatus)
4111 {
4112 	rdc_k_info_t *krdc;
4113 	int index;
4114 	int rc;
4115 
4116 	mutex_enter(&rdc_conf_lock);
4117 
4118 	index = rdc_lookup_byname(uparms->rdc_set);
4119 	if (index >= 0)
4120 		krdc = &rdc_k_info[index];
4121 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
4122 		mutex_exit(&rdc_conf_lock);
4123 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4124 		    uparms->rdc_set->secondary.file);
4125 		return (RDC_EALREADY);
4126 	}
4127 
4128 	krdc->type_flag |= RDC_DISABLEPEND;
4129 	wait_busy(krdc);
4130 	if (krdc->type_flag == 0) {
4131 		/* A resume or enable failed */
4132 		mutex_exit(&rdc_conf_lock);
4133 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4134 		    uparms->rdc_set->secondary.file);
4135 		return (RDC_EALREADY);
4136 	}
4137 	mutex_exit(&rdc_conf_lock);
4138 
4139 	rc = _rdc_suspend(krdc, uparms->rdc_set, kstatus);
4140 	return (rc);
4141 }
4142 
4143 static int
4144 _rdc_resume(rdc_set_t *rdc_set, int options, spcs_s_info_t kstatus)
4145 {
4146 	int index;
4147 	char *rhost;
4148 	struct netbuf *addrp;
4149 	rdc_k_info_t *krdc;
4150 	rdc_u_info_t *urdc;
4151 	rdc_srv_t *svp = NULL;
4152 	char *local_file;
4153 	char *local_bitmap;
4154 	int rc, rc1;
4155 	nsc_size_t maxfbas;
4156 	rdc_group_t *grp;
4157 
4158 	if ((rdc_set->primary.intf[0] == 0) ||
4159 	    (rdc_set->primary.addr.len == 0) ||
4160 	    (rdc_set->primary.file[0] == 0) ||
4161 	    (rdc_set->primary.bitmap[0] == 0) ||
4162 	    (rdc_set->secondary.intf[0] == 0) ||
4163 	    (rdc_set->secondary.addr.len == 0) ||
4164 	    (rdc_set->secondary.file[0] == 0) ||
4165 	    (rdc_set->secondary.bitmap[0] == 0)) {
4166 		spcs_s_add(kstatus, RDC_EEMPTY);
4167 		return (RDC_EEMPTY);
4168 	}
4169 
4170 	/* Next check there aren't any enabled rdc sets which match. */
4171 
4172 	mutex_enter(&rdc_conf_lock);
4173 
4174 	if (rdc_lookup_byname(rdc_set) >= 0) {
4175 		mutex_exit(&rdc_conf_lock);
4176 		spcs_s_add(kstatus, RDC_EENABLED, rdc_set->primary.intf,
4177 		    rdc_set->primary.file, rdc_set->secondary.intf,
4178 		    rdc_set->secondary.file);
4179 		return (RDC_EENABLED);
4180 	}
4181 
4182 	if (rdc_lookup_many2one(rdc_set) >= 0) {
4183 		mutex_exit(&rdc_conf_lock);
4184 		spcs_s_add(kstatus, RDC_EMANY2ONE, rdc_set->primary.intf,
4185 		    rdc_set->primary.file, rdc_set->secondary.intf,
4186 		    rdc_set->secondary.file);
4187 		return (RDC_EMANY2ONE);
4188 	}
4189 
4190 	if (rdc_set->netconfig->knc_proto == NULL) {
4191 		mutex_exit(&rdc_conf_lock);
4192 		spcs_s_add(kstatus, RDC_ENETCONFIG);
4193 		return (RDC_ENETCONFIG);
4194 	}
4195 
4196 	if (rdc_set->primary.addr.len == 0) {
4197 		mutex_exit(&rdc_conf_lock);
4198 		spcs_s_add(kstatus, RDC_ENETBUF, rdc_set->primary.file);
4199 		return (RDC_ENETBUF);
4200 	}
4201 
4202 	if (rdc_set->secondary.addr.len == 0) {
4203 		mutex_exit(&rdc_conf_lock);
4204 		spcs_s_add(kstatus, RDC_ENETBUF, rdc_set->secondary.file);
4205 		return (RDC_ENETBUF);
4206 	}
4207 
4208 	/* Check that the local data volume isn't in use as a bitmap */
4209 	if (options & RDC_OPT_PRIMARY)
4210 		local_file = rdc_set->primary.file;
4211 	else
4212 		local_file = rdc_set->secondary.file;
4213 	if (rdc_lookup_bitmap(local_file) >= 0) {
4214 		mutex_exit(&rdc_conf_lock);
4215 		spcs_s_add(kstatus, RDC_EVOLINUSE, local_file);
4216 		return (RDC_EVOLINUSE);
4217 	}
4218 
4219 	/* check that the secondary data volume isn't in use */
4220 	if (!(options & RDC_OPT_PRIMARY)) {
4221 		local_file = rdc_set->secondary.file;
4222 		if (rdc_lookup_secondary(local_file) >= 0) {
4223 			mutex_exit(&rdc_conf_lock);
4224 			spcs_s_add(kstatus, RDC_EVOLINUSE, local_file);
4225 			return (RDC_EVOLINUSE);
4226 		}
4227 	}
4228 
4229 	/* Check that the bitmap isn't in use as a data volume */
4230 	if (options & RDC_OPT_PRIMARY)
4231 		local_bitmap = rdc_set->primary.bitmap;
4232 	else
4233 		local_bitmap = rdc_set->secondary.bitmap;
4234 	if (rdc_lookup_configured(local_bitmap) >= 0) {
4235 		mutex_exit(&rdc_conf_lock);
4236 		spcs_s_add(kstatus, RDC_EBMPINUSE, local_bitmap);
4237 		return (RDC_EBMPINUSE);
4238 	}
4239 
4240 	/* Check that the bitmap isn't already in use as a bitmap */
4241 	if (rdc_lookup_bitmap(local_bitmap) >= 0) {
4242 		mutex_exit(&rdc_conf_lock);
4243 		spcs_s_add(kstatus, RDC_EBMPINUSE, local_bitmap);
4244 		return (RDC_EBMPINUSE);
4245 	}
4246 
4247 	/* Set urdc->volume_size */
4248 	index = rdc_dev_open(rdc_set, options);
4249 	if (index < 0) {
4250 		mutex_exit(&rdc_conf_lock);
4251 		if (options & RDC_OPT_PRIMARY)
4252 			spcs_s_add(kstatus, RDC_EOPEN, rdc_set->primary.intf,
4253 			    rdc_set->primary.file);
4254 		else
4255 			spcs_s_add(kstatus, RDC_EOPEN, rdc_set->secondary.intf,
4256 			    rdc_set->secondary.file);
4257 		return (RDC_EOPEN);
4258 	}
4259 
4260 	urdc = &rdc_u_info[index];
4261 	krdc = &rdc_k_info[index];
4262 
4263 	/* copy relevant parts of rdc_set to urdc field by field */
4264 
4265 	(void) strncpy(urdc->primary.intf, rdc_set->primary.intf,
4266 	    MAX_RDC_HOST_SIZE);
4267 	(void) strncpy(urdc->secondary.intf, rdc_set->secondary.intf,
4268 	    MAX_RDC_HOST_SIZE);
4269 
4270 	(void) strncpy(urdc->group_name, rdc_set->group_name, NSC_MAXPATH);
4271 
4272 	dup_rdc_netbuf(&rdc_set->primary.addr, &urdc->primary.addr);
4273 	(void) strncpy(urdc->primary.file, rdc_set->primary.file, NSC_MAXPATH);
4274 	(void) strncpy(urdc->primary.bitmap, rdc_set->primary.bitmap,
4275 	    NSC_MAXPATH);
4276 
4277 	dup_rdc_netbuf(&rdc_set->secondary.addr, &urdc->secondary.addr);
4278 	(void) strncpy(urdc->secondary.file, rdc_set->secondary.file,
4279 	    NSC_MAXPATH);
4280 	(void) strncpy(urdc->secondary.bitmap, rdc_set->secondary.bitmap,
4281 	    NSC_MAXPATH);
4282 	(void) strncpy(urdc->disk_queue, rdc_set->disk_queue, NSC_MAXPATH);
4283 	urdc->setid = rdc_set->setid;
4284 
4285 	if ((options & RDC_OPT_SYNC) && urdc->disk_queue[0]) {
4286 		mutex_exit(&rdc_conf_lock);
4287 		rdc_dev_close(krdc);
4288 		spcs_s_add(kstatus, RDC_EQWRONGMODE);
4289 		return (RDC_EQWRONGMODE);
4290 	}
4291 
4292 	/*
4293 	 * init flags now so that state left by failures in add_to_group()
4294 	 * are preserved.
4295 	 */
4296 	rdc_init_flags(urdc);
4297 
4298 	if ((rc1 = add_to_group(krdc, options, RDC_CMD_RESUME)) != 0) {
4299 		if (rc1 == RDC_EQNOADD) { /* something went wrong with queue */
4300 			rdc_fail_diskq(krdc, RDC_WAIT, RDC_NOLOG);
4301 			/* don't return a failure here, continue with resume */
4302 
4303 		} else { /* some other group add failure */
4304 			mutex_exit(&rdc_conf_lock);
4305 			rdc_dev_close(krdc);
4306 			spcs_s_add(kstatus, RDC_EGROUP,
4307 			    rdc_set->primary.intf, rdc_set->primary.file,
4308 			    rdc_set->secondary.intf, rdc_set->secondary.file,
4309 			    rdc_set->group_name);
4310 			return (RDC_EGROUP);
4311 		}
4312 	}
4313 
4314 	/*
4315 	 * maxfbas was set in rdc_dev_open as primary's maxfbas.
4316 	 * If diskq's maxfbas is smaller, then use diskq's.
4317 	 */
4318 	grp = krdc->group;
4319 	if (grp && RDC_IS_DISKQ(grp) && (grp->diskqfd != 0)) {
4320 		rc = _rdc_rsrv_diskq(grp);
4321 		if (RDC_SUCCESS(rc)) {
4322 			rc = nsc_maxfbas(grp->diskqfd, 0, &maxfbas);
4323 			if (rc == 0) {
4324 #ifdef DEBUG
4325 				if (krdc->maxfbas != maxfbas)
4326 					cmn_err(CE_NOTE,
4327 					    "!_rdc_resume: diskq maxfbas = %"
4328 					    NSC_SZFMT ", primary maxfbas = %"
4329 					    NSC_SZFMT, maxfbas, krdc->maxfbas);
4330 #endif
4331 					krdc->maxfbas = min(krdc->maxfbas,
4332 					    maxfbas);
4333 			} else {
4334 				cmn_err(CE_WARN,
4335 				    "!_rdc_resume: diskq maxfbas failed (%d)",
4336 				    rc);
4337 			}
4338 			_rdc_rlse_diskq(grp);
4339 		} else {
4340 			cmn_err(CE_WARN,
4341 			    "!_rdc_resume: diskq reserve failed (%d)", rc);
4342 		}
4343 	}
4344 
4345 	(void) strncpy(urdc->direct_file, rdc_set->direct_file, NSC_MAXPATH);
4346 	if ((options & RDC_OPT_PRIMARY) && rdc_set->direct_file[0]) {
4347 		if (rdc_open_direct(krdc) == NULL)
4348 			rdc_set_flags(urdc, RDC_FCAL_FAILED);
4349 	}
4350 
4351 	krdc->many_next = krdc;
4352 
4353 	ASSERT(krdc->type_flag == 0);
4354 	krdc->type_flag = RDC_CONFIGURED;
4355 
4356 	if (options & RDC_OPT_PRIMARY)
4357 		rdc_set_flags(urdc, RDC_PRIMARY);
4358 
4359 	if (options & RDC_OPT_ASYNC)
4360 		krdc->type_flag |= RDC_ASYNCMODE;
4361 
4362 	set_busy(krdc);
4363 
4364 	urdc->syshostid = rdc_set->syshostid;
4365 
4366 	if (add_to_many(krdc) < 0) {
4367 		mutex_exit(&rdc_conf_lock);
4368 
4369 		rdc_group_enter(krdc);
4370 
4371 		spcs_s_add(kstatus, RDC_EMULTI);
4372 		rc = RDC_EMULTI;
4373 		goto fail;
4374 	}
4375 
4376 	/* Configured but not enabled */
4377 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4378 
4379 	mutex_exit(&rdc_conf_lock);
4380 
4381 	if (urdc->volume_size == 0) {
4382 		rdc_many_enter(krdc);
4383 		if (options & RDC_OPT_PRIMARY)
4384 			rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
4385 		else
4386 			rdc_set_flags(urdc, RDC_SYNC_NEEDED);
4387 		rdc_set_flags(urdc, RDC_VOL_FAILED);
4388 		rdc_many_exit(krdc);
4389 	}
4390 
4391 	rdc_group_enter(krdc);
4392 
4393 	/* Configured but not enabled */
4394 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4395 
4396 	/*
4397 	 * The rdc set is configured but not yet enabled. Other operations must
4398 	 * ignore this set until it is enabled.
4399 	 */
4400 
4401 	urdc->sync_pos = 0;
4402 
4403 	/* Set tunable defaults, we'll pick up tunables from the header later */
4404 
4405 	urdc->maxqfbas = rdc_maxthres_queue;
4406 	urdc->maxqitems = rdc_max_qitems;
4407 	urdc->autosync = 0;
4408 	urdc->asyncthr = rdc_asyncthr;
4409 
4410 	urdc->netconfig = rdc_set->netconfig;
4411 
4412 	if (options & RDC_OPT_PRIMARY) {
4413 		rhost = rdc_set->secondary.intf;
4414 		addrp = &rdc_set->secondary.addr;
4415 	} else {
4416 		rhost = rdc_set->primary.intf;
4417 		addrp = &rdc_set->primary.addr;
4418 	}
4419 
4420 	if (options & RDC_OPT_ASYNC)
4421 		rdc_set_flags(urdc, RDC_ASYNC);
4422 
4423 	svp = rdc_create_svinfo(rhost, addrp, urdc->netconfig);
4424 	if (svp == NULL) {
4425 		spcs_s_add(kstatus, ENOMEM);
4426 		rc = ENOMEM;
4427 		goto fail;
4428 	}
4429 
4430 	urdc->netconfig = NULL;		/* This will be no good soon */
4431 
4432 	/* Don't set krdc->intf here */
4433 	rdc_kstat_create(index);
4434 
4435 	/* if the bitmap resume isn't clean, it will clear queuing flag */
4436 
4437 	(void) rdc_resume_bitmap(krdc);
4438 
4439 	if (RDC_IS_DISKQ(krdc->group)) {
4440 		disk_queue *q = &krdc->group->diskq;
4441 		if ((rc1 == RDC_EQNOADD) ||
4442 		    IS_QSTATE(q, RDC_QBADRESUME)) {
4443 			rdc_clr_flags(urdc, RDC_QUEUING);
4444 			RDC_ZERO_BITREF(krdc);
4445 		}
4446 	}
4447 
4448 	if (krdc->lsrv == NULL)
4449 		krdc->lsrv = svp;
4450 	else {
4451 #ifdef DEBUG
4452 		cmn_err(CE_WARN, "!_rdc_resume: krdc->lsrv already set: %p",
4453 		    (void *) krdc->lsrv);
4454 #endif
4455 		rdc_destroy_svinfo(svp);
4456 	}
4457 	svp = NULL;
4458 
4459 	/* Configured but not enabled */
4460 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4461 
4462 	/* And finally */
4463 
4464 	krdc->remote_index = -1;
4465 
4466 	/* Should we set the whole group logging? */
4467 	rdc_set_flags(urdc, RDC_ENABLED | RDC_LOGGING);
4468 
4469 	rdc_group_exit(krdc);
4470 
4471 	if (rdc_intercept(krdc) != 0) {
4472 		rdc_group_enter(krdc);
4473 		rdc_clr_flags(urdc, RDC_ENABLED);
4474 		if (options & RDC_OPT_PRIMARY)
4475 			spcs_s_add(kstatus, RDC_EREGISTER, urdc->primary.file);
4476 		else
4477 			spcs_s_add(kstatus, RDC_EREGISTER,
4478 			    urdc->secondary.file);
4479 #ifdef DEBUG
4480 		cmn_err(CE_NOTE, "!nsc_register_path failed %s",
4481 		    urdc->primary.file);
4482 #endif
4483 		rc = RDC_EREGISTER;
4484 		goto bmpfail;
4485 	}
4486 #ifdef DEBUG
4487 	cmn_err(CE_NOTE, "!SNDR: resumed %s %s", urdc->primary.file,
4488 	    urdc->secondary.file);
4489 #endif
4490 
4491 	rdc_write_state(urdc);
4492 
4493 	mutex_enter(&rdc_conf_lock);
4494 	wakeup_busy(krdc);
4495 	mutex_exit(&rdc_conf_lock);
4496 
4497 	return (0);
4498 
4499 bmpfail:
4500 	if (options & RDC_OPT_PRIMARY)
4501 		spcs_s_add(kstatus, RDC_EBITMAP, urdc->primary.bitmap);
4502 	else
4503 		spcs_s_add(kstatus, RDC_EBITMAP, urdc->secondary.bitmap);
4504 	rc = RDC_EBITMAP;
4505 	if (rdc_get_vflags(urdc) & RDC_ENABLED) {
4506 		rdc_group_exit(krdc);
4507 		(void) rdc_unintercept(krdc);
4508 		rdc_group_enter(krdc);
4509 	}
4510 
4511 fail:
4512 	rdc_kstat_delete(index);
4513 	/* Don't unset krdc->intf here, unlike _rdc_enable */
4514 
4515 	/* Configured but not enabled */
4516 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4517 
4518 	rdc_dev_close(krdc);
4519 	rdc_close_direct(krdc);
4520 	rdc_destroy_svinfo(svp);
4521 
4522 	/* Configured but not enabled */
4523 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4524 
4525 	rdc_group_exit(krdc);
4526 
4527 	mutex_enter(&rdc_conf_lock);
4528 
4529 	/* Configured but not enabled */
4530 	ASSERT(IS_CONFIGURED(krdc) && !IS_ENABLED(urdc));
4531 
4532 	remove_from_group(krdc);
4533 
4534 	if (IS_MANY(krdc) || IS_MULTI(krdc))
4535 		remove_from_many(krdc);
4536 
4537 	rdc_u_init(urdc);
4538 
4539 	ASSERT(krdc->type_flag & RDC_CONFIGURED);
4540 	krdc->type_flag = 0;
4541 	wakeup_busy(krdc);
4542 
4543 	mutex_exit(&rdc_conf_lock);
4544 
4545 	return (rc);
4546 }
4547 
4548 static int
4549 rdc_resume(rdc_config_t *uparms, spcs_s_info_t kstatus)
4550 {
4551 	char itmp[10];
4552 	int rc;
4553 
4554 	if (!(uparms->options & RDC_OPT_SYNC) &&
4555 	    !(uparms->options & RDC_OPT_ASYNC)) {
4556 		(void) spcs_s_inttostring(
4557 		    uparms->options, itmp, sizeof (itmp), 1);
4558 		spcs_s_add(kstatus, RDC_EEINVAL, itmp);
4559 		rc = RDC_EEINVAL;
4560 		goto done;
4561 	}
4562 
4563 	if (!(uparms->options & RDC_OPT_PRIMARY) &&
4564 	    !(uparms->options & RDC_OPT_SECONDARY)) {
4565 		(void) spcs_s_inttostring(
4566 		    uparms->options, itmp, sizeof (itmp), 1);
4567 		spcs_s_add(kstatus, RDC_EEINVAL, itmp);
4568 		rc = RDC_EEINVAL;
4569 		goto done;
4570 	}
4571 
4572 	rc = _rdc_resume(uparms->rdc_set, uparms->options, kstatus);
4573 done:
4574 	return (rc);
4575 }
4576 
4577 /*
4578  * if rdc_group_log is called because a volume has failed,
4579  * we must disgard the queue to preserve write ordering.
4580  * later perhaps, we can keep queuing, but we would have to
4581  * rewrite the i/o path to acommodate that. currently, if there
4582  * is a volume failure, the buffers are satisfied remotely and
4583  * there is no way to satisfy them from the current diskq config
4584  * phew, if we do that.. it will be difficult
4585  */
4586 int
4587 rdc_can_queue(rdc_k_info_t *krdc)
4588 {
4589 	rdc_k_info_t *p;
4590 	rdc_u_info_t *q;
4591 
4592 	for (p = krdc->group_next; ; p = p->group_next) {
4593 		q = &rdc_u_info[p->index];
4594 		if (IS_STATE(q, RDC_VOL_FAILED))
4595 			return (0);
4596 		if (p == krdc)
4597 			break;
4598 	}
4599 	return (1);
4600 }
4601 
4602 /*
4603  * wait here, until all in flight async i/o's have either
4604  * finished or failed. Avoid the race with r_net_state()
4605  * which tells remote end to log.
4606  */
4607 void
4608 rdc_inflwait(rdc_group_t *grp)
4609 {
4610 	int bail = RDC_CLNT_TMOUT * 2; /* to include retries */
4611 	volatile int *inflitems;
4612 
4613 	if (RDC_IS_DISKQ(grp))
4614 		inflitems = (&(grp->diskq.inflitems));
4615 	else
4616 		inflitems = (&(grp->ra_queue.inflitems));
4617 
4618 	while (*inflitems && (--bail > 0))
4619 		delay(HZ);
4620 }
4621 
4622 void
4623 rdc_group_log(rdc_k_info_t *krdc, int flag, char *why)
4624 {
4625 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
4626 	rdc_k_info_t *p;
4627 	rdc_u_info_t *q;
4628 	int do_group;
4629 	int sm, um, md;
4630 	disk_queue *dq;
4631 
4632 	void (*flag_op)(rdc_u_info_t *urdc, int flag);
4633 
4634 	ASSERT(MUTEX_HELD(&krdc->group->lock));
4635 
4636 	if (!IS_ENABLED(urdc))
4637 		return;
4638 
4639 	rdc_many_enter(krdc);
4640 
4641 	if ((flag & RDC_QUEUING) && (!IS_STATE(urdc, RDC_SYNCING)) &&
4642 	    (rdc_can_queue(krdc))) {
4643 		flag_op = rdc_set_flags; /* keep queuing, link error */
4644 		flag &= ~RDC_FLUSH;
4645 	} else {
4646 		flag_op = rdc_clr_flags; /* stop queuing, user request */
4647 	}
4648 
4649 	do_group = 1;
4650 	if (!(rdc_get_vflags(urdc) & RDC_PRIMARY))
4651 		do_group = 0;
4652 	else if ((urdc->group_name[0] == 0) ||
4653 	    (rdc_get_vflags(urdc) & RDC_LOGGING) ||
4654 	    (rdc_get_vflags(urdc) & RDC_SYNCING))
4655 		do_group = 0;
4656 	if (do_group) {
4657 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
4658 			q = &rdc_u_info[p->index];
4659 			if (!IS_ENABLED(q))
4660 				continue;
4661 			if ((rdc_get_vflags(q) & RDC_LOGGING) ||
4662 			    (rdc_get_vflags(q) & RDC_SYNCING)) {
4663 				do_group = 0;
4664 				break;
4665 			}
4666 		}
4667 	}
4668 	if (!do_group && (flag & RDC_FORCE_GROUP))
4669 		do_group = 1;
4670 
4671 	rdc_many_exit(krdc);
4672 	dq = &krdc->group->diskq;
4673 	if (do_group) {
4674 #ifdef DEBUG
4675 		cmn_err(CE_NOTE, "!SNDR:Group point-in-time for grp: %s %s:%s",
4676 		    urdc->group_name, urdc->primary.intf, urdc->secondary.intf);
4677 #endif
4678 		DTRACE_PROBE(rdc_diskq_group_PIT);
4679 
4680 		/* Set group logging at the same PIT under rdc_many_lock */
4681 		rdc_many_enter(krdc);
4682 		rdc_set_flags_log(urdc, RDC_LOGGING, why);
4683 		if (RDC_IS_DISKQ(krdc->group))
4684 			flag_op(urdc, RDC_QUEUING);
4685 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
4686 			q = &rdc_u_info[p->index];
4687 			if (!IS_ENABLED(q))
4688 				continue;
4689 			rdc_set_flags_log(q, RDC_LOGGING,
4690 			    "consistency group member following leader");
4691 			if (RDC_IS_DISKQ(p->group))
4692 				flag_op(q, RDC_QUEUING);
4693 		}
4694 
4695 		rdc_many_exit(krdc);
4696 
4697 		/*
4698 		 * This can cause the async threads to fail,
4699 		 * which in turn will call rdc_group_log()
4700 		 * again. Release the lock and re-aquire.
4701 		 */
4702 		rdc_group_exit(krdc);
4703 
4704 		while (rdc_dump_alloc_bufs_cd(krdc->index) == EAGAIN)
4705 			delay(2);
4706 		if (!RDC_IS_DISKQ(krdc->group))
4707 			RDC_ZERO_BITREF(krdc);
4708 
4709 		rdc_inflwait(krdc->group);
4710 
4711 		/*
4712 		 * a little lazy, but neat. recall dump_alloc_bufs to
4713 		 * ensure that the queue pointers & seq are reset properly
4714 		 * after we have waited for inflight stuff
4715 		 */
4716 		while (rdc_dump_alloc_bufs_cd(krdc->index) == EAGAIN)
4717 			delay(2);
4718 
4719 		rdc_group_enter(krdc);
4720 		if (RDC_IS_DISKQ(krdc->group) && (!(flag & RDC_QUEUING))) {
4721 			/* fail or user request */
4722 			RDC_ZERO_BITREF(krdc);
4723 			mutex_enter(&krdc->group->diskq.disk_qlock);
4724 			rdc_init_diskq_header(krdc->group,
4725 			    &krdc->group->diskq.disk_hdr);
4726 			SET_QNXTIO(dq, QHEAD(dq));
4727 			mutex_exit(&krdc->group->diskq.disk_qlock);
4728 		}
4729 
4730 		if (flag & RDC_ALLREMOTE) {
4731 			/* Tell other node to start logging */
4732 			if (krdc->lsrv && krdc->intf && !krdc->intf->if_down)
4733 				(void) rdc_net_state(krdc->index,
4734 				    CCIO_ENABLELOG);
4735 		}
4736 
4737 		if (flag & (RDC_ALLREMOTE | RDC_OTHERREMOTE)) {
4738 			rdc_many_enter(krdc);
4739 			for (p = krdc->group_next; p != krdc;
4740 			    p = p->group_next) {
4741 				if (p->lsrv && krdc->intf &&
4742 				    !krdc->intf->if_down) {
4743 					(void) rdc_net_state(p->index,
4744 					    CCIO_ENABLELOG);
4745 				}
4746 			}
4747 			rdc_many_exit(krdc);
4748 		}
4749 
4750 		rdc_write_state(urdc);
4751 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
4752 			q = &rdc_u_info[p->index];
4753 			if (!IS_ENABLED(q))
4754 				continue;
4755 			rdc_write_state(q);
4756 		}
4757 	} else {
4758 		/* No point in time is possible, just deal with single set */
4759 
4760 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
4761 			halt_sync(krdc);
4762 		} else {
4763 			if (rdc_net_getstate(krdc, &sm, &um, &md, TRUE) < 0) {
4764 				rdc_clr_flags(urdc, RDC_SYNCING);
4765 				rdc_set_flags_log(urdc, RDC_LOGGING,
4766 				    "failed to read remote state");
4767 
4768 				rdc_write_state(urdc);
4769 				while (rdc_dump_alloc_bufs_cd(krdc->index)
4770 				    == EAGAIN)
4771 					delay(2);
4772 				if ((RDC_IS_DISKQ(krdc->group)) &&
4773 				    (!(flag & RDC_QUEUING))) { /* fail! */
4774 					mutex_enter(QLOCK(dq));
4775 					rdc_init_diskq_header(krdc->group,
4776 					    &krdc->group->diskq.disk_hdr);
4777 					SET_QNXTIO(dq, QHEAD(dq));
4778 					mutex_exit(QLOCK(dq));
4779 				}
4780 
4781 				return;
4782 			}
4783 		}
4784 
4785 		if (rdc_get_vflags(urdc) & RDC_SYNCING)
4786 			return;
4787 
4788 		if (RDC_IS_DISKQ(krdc->group))
4789 			flag_op(urdc, RDC_QUEUING);
4790 
4791 		if ((RDC_IS_DISKQ(krdc->group)) &&
4792 		    (!(flag & RDC_QUEUING))) { /* fail! */
4793 			RDC_ZERO_BITREF(krdc);
4794 			mutex_enter(QLOCK(dq));
4795 			rdc_init_diskq_header(krdc->group,
4796 			    &krdc->group->diskq.disk_hdr);
4797 			SET_QNXTIO(dq, QHEAD(dq));
4798 			mutex_exit(QLOCK(dq));
4799 		}
4800 
4801 		if (!(rdc_get_vflags(urdc) & RDC_LOGGING)) {
4802 			rdc_set_flags_log(urdc, RDC_LOGGING, why);
4803 
4804 			rdc_write_state(urdc);
4805 
4806 			while (rdc_dump_alloc_bufs_cd(krdc->index) == EAGAIN)
4807 				delay(2);
4808 			if (!RDC_IS_DISKQ(krdc->group))
4809 				RDC_ZERO_BITREF(krdc);
4810 
4811 			rdc_inflwait(krdc->group);
4812 			/*
4813 			 * a little lazy, but neat. recall dump_alloc_bufs to
4814 			 * ensure that the queue pointers & seq are reset
4815 			 * properly after we have waited for inflight stuff
4816 			 */
4817 			while (rdc_dump_alloc_bufs_cd(krdc->index) == EAGAIN)
4818 				delay(2);
4819 
4820 			if (flag & RDC_ALLREMOTE) {
4821 				/* Tell other node to start logging */
4822 				if (krdc->lsrv && krdc->intf &&
4823 				    !krdc->intf->if_down) {
4824 					(void) rdc_net_state(krdc->index,
4825 					    CCIO_ENABLELOG);
4826 				}
4827 			}
4828 		}
4829 	}
4830 	/*
4831 	 * just in case any threads were in flight during log cleanup
4832 	 */
4833 	if (RDC_IS_DISKQ(krdc->group)) {
4834 		mutex_enter(QLOCK(dq));
4835 		cv_broadcast(&dq->qfullcv);
4836 		mutex_exit(QLOCK(dq));
4837 	}
4838 }
4839 
4840 static int
4841 _rdc_log(rdc_k_info_t *krdc, rdc_set_t *rdc_set, spcs_s_info_t kstatus)
4842 {
4843 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
4844 	rdc_srv_t *svp;
4845 
4846 	rdc_group_enter(krdc);
4847 	if (rdc_check(krdc, rdc_set)) {
4848 		rdc_group_exit(krdc);
4849 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
4850 		    rdc_set->secondary.file);
4851 		return (RDC_EALREADY);
4852 	}
4853 
4854 	svp = krdc->lsrv;
4855 	if (rdc_get_vflags(urdc) & RDC_PRIMARY)
4856 		krdc->intf = rdc_add_to_if(svp, &(urdc->primary.addr),
4857 		    &(urdc->secondary.addr), 1);
4858 	else
4859 		krdc->intf = rdc_add_to_if(svp, &(urdc->secondary.addr),
4860 		    &(urdc->primary.addr), 0);
4861 
4862 	if (!krdc->intf) {
4863 		rdc_group_exit(krdc);
4864 		spcs_s_add(kstatus, RDC_EADDTOIF, urdc->primary.intf,
4865 		    urdc->secondary.intf);
4866 		return (RDC_EADDTOIF);
4867 	}
4868 
4869 	rdc_group_log(krdc, RDC_FLUSH | RDC_ALLREMOTE, NULL);
4870 
4871 	if (rdc_get_vflags(urdc) & RDC_SYNCING) {
4872 		rdc_group_exit(krdc);
4873 		spcs_s_add(kstatus, RDC_ESYNCING, urdc->primary.file);
4874 		return (RDC_ESYNCING);
4875 	}
4876 
4877 	rdc_group_exit(krdc);
4878 
4879 	return (0);
4880 }
4881 
4882 static int
4883 rdc_log(rdc_config_t *uparms, spcs_s_info_t kstatus)
4884 {
4885 	rdc_k_info_t *krdc;
4886 	int rc = 0;
4887 	int index;
4888 
4889 	mutex_enter(&rdc_conf_lock);
4890 	index = rdc_lookup_byname(uparms->rdc_set);
4891 	if (index >= 0)
4892 		krdc = &rdc_k_info[index];
4893 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
4894 		mutex_exit(&rdc_conf_lock);
4895 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4896 		    uparms->rdc_set->secondary.file);
4897 		return (RDC_EALREADY);
4898 	}
4899 
4900 	set_busy(krdc);
4901 	if (krdc->type_flag == 0) {
4902 		/* A resume or enable failed */
4903 		wakeup_busy(krdc);
4904 		mutex_exit(&rdc_conf_lock);
4905 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4906 		    uparms->rdc_set->secondary.file);
4907 		return (RDC_EALREADY);
4908 	}
4909 	mutex_exit(&rdc_conf_lock);
4910 
4911 	rc = _rdc_log(krdc, uparms->rdc_set, kstatus);
4912 
4913 	mutex_enter(&rdc_conf_lock);
4914 	wakeup_busy(krdc);
4915 	mutex_exit(&rdc_conf_lock);
4916 
4917 	return (rc);
4918 }
4919 
4920 
4921 static int
4922 rdc_wait(rdc_config_t *uparms, spcs_s_info_t kstatus)
4923 {
4924 	rdc_k_info_t *krdc;
4925 	rdc_u_info_t *urdc;
4926 	int index;
4927 	int need_check = 0;
4928 
4929 	mutex_enter(&rdc_conf_lock);
4930 	index = rdc_lookup_byname(uparms->rdc_set);
4931 	if (index >= 0)
4932 		krdc = &rdc_k_info[index];
4933 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
4934 		mutex_exit(&rdc_conf_lock);
4935 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4936 		    uparms->rdc_set->secondary.file);
4937 		return (RDC_EALREADY);
4938 	}
4939 
4940 	urdc = &rdc_u_info[index];
4941 	if (!(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
4942 		mutex_exit(&rdc_conf_lock);
4943 		return (0);
4944 	}
4945 
4946 	set_busy(krdc);
4947 	if (krdc->type_flag == 0) {
4948 		/* A resume or enable failed */
4949 		wakeup_busy(krdc);
4950 		mutex_exit(&rdc_conf_lock);
4951 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4952 		    uparms->rdc_set->secondary.file);
4953 		return (RDC_EALREADY);
4954 	}
4955 	mutex_exit(&rdc_conf_lock);
4956 
4957 	rdc_group_enter(krdc);
4958 	if (rdc_check(krdc, uparms->rdc_set)) {
4959 		rdc_group_exit(krdc);
4960 		mutex_enter(&rdc_conf_lock);
4961 		wakeup_busy(krdc);
4962 		mutex_exit(&rdc_conf_lock);
4963 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
4964 		    uparms->rdc_set->secondary.file);
4965 		return (RDC_EALREADY);
4966 	}
4967 
4968 	if ((rdc_get_vflags(urdc) & (RDC_SYNCING | RDC_PRIMARY)) !=
4969 	    (RDC_SYNCING | RDC_PRIMARY)) {
4970 		rdc_group_exit(krdc);
4971 		mutex_enter(&rdc_conf_lock);
4972 		wakeup_busy(krdc);
4973 		mutex_exit(&rdc_conf_lock);
4974 		return (0);
4975 	}
4976 	if (rdc_get_vflags(urdc) & RDC_SYNCING) {
4977 		need_check = 1;
4978 	}
4979 	rdc_group_exit(krdc);
4980 
4981 	mutex_enter(&net_blk_lock);
4982 
4983 	mutex_enter(&rdc_conf_lock);
4984 	wakeup_busy(krdc);
4985 	mutex_exit(&rdc_conf_lock);
4986 
4987 	(void) cv_wait_sig(&krdc->synccv, &net_blk_lock);
4988 
4989 	mutex_exit(&net_blk_lock);
4990 	if (need_check) {
4991 		if (krdc->sync_done == RDC_COMPLETED) {
4992 			return (0);
4993 		} else if (krdc->sync_done == RDC_FAILED) {
4994 			return (EIO);
4995 		}
4996 	}
4997 	return (0);
4998 }
4999 
5000 
5001 static int
5002 rdc_health(rdc_config_t *uparms, spcs_s_info_t kstatus, int *rvp)
5003 {
5004 	rdc_k_info_t *krdc;
5005 	rdc_u_info_t *urdc;
5006 	int rc = 0;
5007 	int index;
5008 
5009 	mutex_enter(&rdc_conf_lock);
5010 	index = rdc_lookup_byname(uparms->rdc_set);
5011 	if (index >= 0)
5012 		krdc = &rdc_k_info[index];
5013 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
5014 		mutex_exit(&rdc_conf_lock);
5015 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5016 		    uparms->rdc_set->secondary.file);
5017 		return (RDC_EALREADY);
5018 	}
5019 
5020 	set_busy(krdc);
5021 	if (krdc->type_flag == 0) {
5022 		/* A resume or enable failed */
5023 		wakeup_busy(krdc);
5024 		mutex_exit(&rdc_conf_lock);
5025 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5026 		    uparms->rdc_set->secondary.file);
5027 		return (RDC_EALREADY);
5028 	}
5029 
5030 	mutex_exit(&rdc_conf_lock);
5031 
5032 	rdc_group_enter(krdc);
5033 	if (rdc_check(krdc, uparms->rdc_set)) {
5034 		rdc_group_exit(krdc);
5035 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5036 		    uparms->rdc_set->secondary.file);
5037 		rc = RDC_EALREADY;
5038 		goto done;
5039 	}
5040 
5041 	urdc = &rdc_u_info[index];
5042 	if (rdc_isactive_if(&(urdc->primary.addr), &(urdc->secondary.addr)))
5043 		*rvp = RDC_ACTIVE;
5044 	else
5045 		*rvp = RDC_INACTIVE;
5046 
5047 	rdc_group_exit(krdc);
5048 
5049 done:
5050 	mutex_enter(&rdc_conf_lock);
5051 	wakeup_busy(krdc);
5052 	mutex_exit(&rdc_conf_lock);
5053 
5054 	return (rc);
5055 }
5056 
5057 
5058 static int
5059 rdc_reconfig(rdc_config_t *uparms, spcs_s_info_t kstatus)
5060 {
5061 	rdc_k_info_t *krdc;
5062 	rdc_u_info_t *urdc;
5063 	int rc = -2;
5064 	int index;
5065 
5066 	mutex_enter(&rdc_conf_lock);
5067 	index = rdc_lookup_byname(uparms->rdc_set);
5068 	if (index >= 0)
5069 		krdc = &rdc_k_info[index];
5070 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
5071 		mutex_exit(&rdc_conf_lock);
5072 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5073 		    uparms->rdc_set->secondary.file);
5074 		return (RDC_EALREADY);
5075 	}
5076 
5077 	urdc = &rdc_u_info[index];
5078 	set_busy(krdc);
5079 	if (krdc->type_flag == 0) {
5080 		/* A resume or enable failed */
5081 		wakeup_busy(krdc);
5082 		mutex_exit(&rdc_conf_lock);
5083 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5084 		    uparms->rdc_set->secondary.file);
5085 		return (RDC_EALREADY);
5086 	}
5087 
5088 	mutex_exit(&rdc_conf_lock);
5089 
5090 	rdc_group_enter(krdc);
5091 	if (rdc_check(krdc, uparms->rdc_set)) {
5092 		rdc_group_exit(krdc);
5093 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5094 		    uparms->rdc_set->secondary.file);
5095 		rc = RDC_EALREADY;
5096 		goto done;
5097 	}
5098 	if ((rdc_get_vflags(urdc) & RDC_BMP_FAILED) && (krdc->bitmapfd))
5099 		(void) rdc_reset_bitmap(krdc);
5100 
5101 	/* Move to a new bitmap if necessary */
5102 	if (strncmp(urdc->primary.bitmap, uparms->rdc_set->primary.bitmap,
5103 	    NSC_MAXPATH) != 0) {
5104 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
5105 			rc = rdc_move_bitmap(krdc,
5106 			    uparms->rdc_set->primary.bitmap);
5107 		} else {
5108 			(void) strncpy(urdc->primary.bitmap,
5109 			    uparms->rdc_set->primary.bitmap, NSC_MAXPATH);
5110 			/* simulate a succesful rdc_move_bitmap */
5111 			rc = 0;
5112 		}
5113 	}
5114 	if (strncmp(urdc->secondary.bitmap, uparms->rdc_set->secondary.bitmap,
5115 	    NSC_MAXPATH) != 0) {
5116 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
5117 			(void) strncpy(urdc->secondary.bitmap,
5118 			    uparms->rdc_set->secondary.bitmap, NSC_MAXPATH);
5119 			/* simulate a succesful rdc_move_bitmap */
5120 			rc = 0;
5121 		} else {
5122 			rc = rdc_move_bitmap(krdc,
5123 			    uparms->rdc_set->secondary.bitmap);
5124 		}
5125 	}
5126 	if (rc == -1) {
5127 		rdc_group_exit(krdc);
5128 		spcs_s_add(kstatus, RDC_EBMPRECONFIG,
5129 		    uparms->rdc_set->secondary.intf,
5130 		    uparms->rdc_set->secondary.file);
5131 		rc = RDC_EBMPRECONFIG;
5132 		goto done;
5133 	}
5134 
5135 	/*
5136 	 * At this point we fail any other type of reconfig
5137 	 * if not in logging mode and we did not do a bitmap reconfig
5138 	 */
5139 
5140 	if (!(rdc_get_vflags(urdc) & RDC_LOGGING) && rc == -2) {
5141 		/* no other changes possible unless logging */
5142 		rdc_group_exit(krdc);
5143 		spcs_s_add(kstatus, RDC_ENOTLOGGING,
5144 		    uparms->rdc_set->primary.intf,
5145 		    uparms->rdc_set->primary.file,
5146 		    uparms->rdc_set->secondary.intf,
5147 		    uparms->rdc_set->secondary.file);
5148 		rc = RDC_ENOTLOGGING;
5149 		goto done;
5150 	}
5151 	rc = 0;
5152 	/* Change direct file if necessary */
5153 	if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
5154 	    strncmp(urdc->direct_file, uparms->rdc_set->direct_file,
5155 	    NSC_MAXPATH)) {
5156 		if (!(rdc_get_vflags(urdc) & RDC_LOGGING)) {
5157 			rdc_group_exit(krdc);
5158 			goto notlogging;
5159 		}
5160 		rdc_close_direct(krdc);
5161 		(void) strncpy(urdc->direct_file, uparms->rdc_set->direct_file,
5162 		    NSC_MAXPATH);
5163 
5164 		if (urdc->direct_file[0]) {
5165 			if (rdc_open_direct(krdc) == NULL)
5166 				rdc_set_flags(urdc, RDC_FCAL_FAILED);
5167 			else
5168 				rdc_clr_flags(urdc, RDC_FCAL_FAILED);
5169 		}
5170 	}
5171 
5172 	rdc_group_exit(krdc);
5173 
5174 	/* Change group if necessary */
5175 	if (strncmp(urdc->group_name, uparms->rdc_set->group_name,
5176 	    NSC_MAXPATH) != 0) {
5177 		char orig_group[NSC_MAXPATH];
5178 		if (!(rdc_get_vflags(urdc) & RDC_LOGGING))
5179 			goto notlogging;
5180 		mutex_enter(&rdc_conf_lock);
5181 
5182 		(void) strncpy(orig_group, urdc->group_name, NSC_MAXPATH);
5183 		(void) strncpy(urdc->group_name, uparms->rdc_set->group_name,
5184 		    NSC_MAXPATH);
5185 
5186 		rc = change_group(krdc, uparms->options);
5187 		if (rc == RDC_EQNOADD) {
5188 			mutex_exit(&rdc_conf_lock);
5189 			spcs_s_add(kstatus, RDC_EQNOADD,
5190 			    uparms->rdc_set->disk_queue);
5191 			goto done;
5192 		} else if (rc < 0) {
5193 			(void) strncpy(urdc->group_name, orig_group,
5194 			    NSC_MAXPATH);
5195 			mutex_exit(&rdc_conf_lock);
5196 			spcs_s_add(kstatus, RDC_EGROUP,
5197 			    urdc->primary.intf, urdc->primary.file,
5198 			    urdc->secondary.intf, urdc->secondary.file,
5199 			    uparms->rdc_set->group_name);
5200 			rc = RDC_EGROUP;
5201 			goto done;
5202 		}
5203 
5204 		mutex_exit(&rdc_conf_lock);
5205 
5206 		if (rc >= 0) {
5207 			if (!(rdc_get_vflags(urdc) & RDC_LOGGING))
5208 				goto notlogging;
5209 			if (uparms->options & RDC_OPT_ASYNC) {
5210 				mutex_enter(&rdc_conf_lock);
5211 				krdc->type_flag |= RDC_ASYNCMODE;
5212 				mutex_exit(&rdc_conf_lock);
5213 				if (uparms->options & RDC_OPT_PRIMARY)
5214 					krdc->bitmap_ref =
5215 					    (uchar_t *)kmem_zalloc(
5216 					    (krdc->bitmap_size * BITS_IN_BYTE *
5217 					    BMAP_REF_PREF_SIZE), KM_SLEEP);
5218 				rdc_group_enter(krdc);
5219 				rdc_set_flags(urdc, RDC_ASYNC);
5220 				rdc_group_exit(krdc);
5221 			} else {
5222 				mutex_enter(&rdc_conf_lock);
5223 				krdc->type_flag &= ~RDC_ASYNCMODE;
5224 				mutex_exit(&rdc_conf_lock);
5225 				rdc_group_enter(krdc);
5226 				rdc_clr_flags(urdc, RDC_ASYNC);
5227 				rdc_group_exit(krdc);
5228 				if (krdc->bitmap_ref) {
5229 					kmem_free(krdc->bitmap_ref,
5230 					    (krdc->bitmap_size * BITS_IN_BYTE *
5231 					    BMAP_REF_PREF_SIZE));
5232 					krdc->bitmap_ref = NULL;
5233 				}
5234 			}
5235 		}
5236 	} else {
5237 		if ((((uparms->options & RDC_OPT_ASYNC) == 0) &&
5238 		    ((krdc->type_flag & RDC_ASYNCMODE) != 0)) ||
5239 		    (((uparms->options & RDC_OPT_ASYNC) != 0) &&
5240 		    ((krdc->type_flag & RDC_ASYNCMODE) == 0))) {
5241 			if (!(rdc_get_vflags(urdc) & RDC_LOGGING))
5242 				goto notlogging;
5243 
5244 			if (krdc->group->count > 1) {
5245 				spcs_s_add(kstatus, RDC_EGROUPMODE);
5246 				rc = RDC_EGROUPMODE;
5247 				goto done;
5248 			}
5249 		}
5250 
5251 		/* Switch sync/async if necessary */
5252 		if (krdc->group->count == 1) {
5253 			/* Only member of group. Can change sync/async */
5254 			if (((uparms->options & RDC_OPT_ASYNC) == 0) &&
5255 			    ((krdc->type_flag & RDC_ASYNCMODE) != 0)) {
5256 				if (!(rdc_get_vflags(urdc) & RDC_LOGGING))
5257 					goto notlogging;
5258 				/* switch to sync */
5259 				mutex_enter(&rdc_conf_lock);
5260 				krdc->type_flag &= ~RDC_ASYNCMODE;
5261 				if (RDC_IS_DISKQ(krdc->group)) {
5262 					krdc->group->flags &= ~RDC_DISKQUE;
5263 					krdc->group->flags |= RDC_MEMQUE;
5264 					rdc_unintercept_diskq(krdc->group);
5265 					mutex_enter(&krdc->group->diskqmutex);
5266 					rdc_close_diskq(krdc->group);
5267 					mutex_exit(&krdc->group->diskqmutex);
5268 					bzero(&urdc->disk_queue,
5269 					    sizeof (urdc->disk_queue));
5270 				}
5271 				mutex_exit(&rdc_conf_lock);
5272 				rdc_group_enter(krdc);
5273 				rdc_clr_flags(urdc, RDC_ASYNC);
5274 				rdc_group_exit(krdc);
5275 				if (krdc->bitmap_ref) {
5276 					kmem_free(krdc->bitmap_ref,
5277 					    (krdc->bitmap_size * BITS_IN_BYTE *
5278 					    BMAP_REF_PREF_SIZE));
5279 					krdc->bitmap_ref = NULL;
5280 				}
5281 			} else if (((uparms->options & RDC_OPT_ASYNC) != 0) &&
5282 			    ((krdc->type_flag & RDC_ASYNCMODE) == 0)) {
5283 				if (!(rdc_get_vflags(urdc) & RDC_LOGGING))
5284 					goto notlogging;
5285 				/* switch to async */
5286 				mutex_enter(&rdc_conf_lock);
5287 				krdc->type_flag |= RDC_ASYNCMODE;
5288 				mutex_exit(&rdc_conf_lock);
5289 				if (uparms->options & RDC_OPT_PRIMARY)
5290 					krdc->bitmap_ref =
5291 					    (uchar_t *)kmem_zalloc(
5292 					    (krdc->bitmap_size * BITS_IN_BYTE *
5293 					    BMAP_REF_PREF_SIZE), KM_SLEEP);
5294 				rdc_group_enter(krdc);
5295 				rdc_set_flags(urdc, RDC_ASYNC);
5296 				rdc_group_exit(krdc);
5297 			}
5298 		}
5299 	}
5300 	/* Reverse concept of primary and secondary */
5301 	if ((uparms->options & RDC_OPT_REVERSE_ROLE) != 0) {
5302 		rdc_set_t rdc_set;
5303 		struct netbuf paddr, saddr;
5304 
5305 		mutex_enter(&rdc_conf_lock);
5306 
5307 		/*
5308 		 * Disallow role reversal for advanced configurations
5309 		 */
5310 
5311 		if (IS_MANY(krdc) || IS_MULTI(krdc)) {
5312 			mutex_exit(&rdc_conf_lock);
5313 			spcs_s_add(kstatus, RDC_EMASTER, urdc->primary.intf,
5314 			    urdc->primary.file, urdc->secondary.intf,
5315 			    urdc->secondary.file);
5316 			return (RDC_EMASTER);
5317 		}
5318 		bzero((void *) &rdc_set, sizeof (rdc_set_t));
5319 		dup_rdc_netbuf(&urdc->primary.addr, &saddr);
5320 		dup_rdc_netbuf(&urdc->secondary.addr, &paddr);
5321 		free_rdc_netbuf(&urdc->primary.addr);
5322 		free_rdc_netbuf(&urdc->secondary.addr);
5323 		dup_rdc_netbuf(&saddr, &urdc->secondary.addr);
5324 		dup_rdc_netbuf(&paddr, &urdc->primary.addr);
5325 		free_rdc_netbuf(&paddr);
5326 		free_rdc_netbuf(&saddr);
5327 		/* copy primary parts of urdc to rdc_set field by field */
5328 		(void) strncpy(rdc_set.primary.intf, urdc->primary.intf,
5329 		    MAX_RDC_HOST_SIZE);
5330 		(void) strncpy(rdc_set.primary.file, urdc->primary.file,
5331 		    NSC_MAXPATH);
5332 		(void) strncpy(rdc_set.primary.bitmap, urdc->primary.bitmap,
5333 		    NSC_MAXPATH);
5334 
5335 		/* Now overwrite urdc primary */
5336 		(void) strncpy(urdc->primary.intf, urdc->secondary.intf,
5337 		    MAX_RDC_HOST_SIZE);
5338 		(void) strncpy(urdc->primary.file, urdc->secondary.file,
5339 		    NSC_MAXPATH);
5340 		(void) strncpy(urdc->primary.bitmap, urdc->secondary.bitmap,
5341 		    NSC_MAXPATH);
5342 
5343 		/* Now ovwewrite urdc secondary */
5344 		(void) strncpy(urdc->secondary.intf, rdc_set.primary.intf,
5345 		    MAX_RDC_HOST_SIZE);
5346 		(void) strncpy(urdc->secondary.file, rdc_set.primary.file,
5347 		    NSC_MAXPATH);
5348 		(void) strncpy(urdc->secondary.bitmap, rdc_set.primary.bitmap,
5349 		    NSC_MAXPATH);
5350 
5351 		if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
5352 			rdc_clr_flags(urdc, RDC_PRIMARY);
5353 			if (krdc->intf) {
5354 				krdc->intf->issecondary = 1;
5355 				krdc->intf->isprimary = 0;
5356 				krdc->intf->if_down = 1;
5357 			}
5358 		} else {
5359 			rdc_set_flags(urdc, RDC_PRIMARY);
5360 			if (krdc->intf) {
5361 				krdc->intf->issecondary = 0;
5362 				krdc->intf->isprimary = 1;
5363 				krdc->intf->if_down = 1;
5364 			}
5365 		}
5366 
5367 		if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
5368 		    ((krdc->type_flag & RDC_ASYNCMODE) != 0)) {
5369 			if (!krdc->bitmap_ref)
5370 				krdc->bitmap_ref =
5371 				    (uchar_t *)kmem_zalloc((krdc->bitmap_size *
5372 				    BITS_IN_BYTE * BMAP_REF_PREF_SIZE),
5373 				    KM_SLEEP);
5374 			if (krdc->bitmap_ref == NULL) {
5375 				cmn_err(CE_WARN,
5376 				    "!rdc_reconfig: bitmap_ref alloc %"
5377 				    NSC_SZFMT " failed",
5378 				    krdc->bitmap_size * BITS_IN_BYTE *
5379 				    BMAP_REF_PREF_SIZE);
5380 				mutex_exit(&rdc_conf_lock);
5381 				return (-1);
5382 			}
5383 		}
5384 
5385 		if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
5386 		    (rdc_get_vflags(urdc) & RDC_SYNC_NEEDED)) {
5387 			/* Primary, so reverse sync needed */
5388 			rdc_many_enter(krdc);
5389 			rdc_clr_flags(urdc, RDC_SYNC_NEEDED);
5390 			rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
5391 			rdc_many_exit(krdc);
5392 		} else if (rdc_get_vflags(urdc) & RDC_RSYNC_NEEDED) {
5393 			/* Secondary, so forward sync needed */
5394 			rdc_many_enter(krdc);
5395 			rdc_clr_flags(urdc, RDC_RSYNC_NEEDED);
5396 			rdc_set_flags(urdc, RDC_SYNC_NEEDED);
5397 			rdc_many_exit(krdc);
5398 		}
5399 
5400 		/*
5401 		 * rewrite bitmap header
5402 		 */
5403 		rdc_write_state(urdc);
5404 		mutex_exit(&rdc_conf_lock);
5405 	}
5406 
5407 done:
5408 	mutex_enter(&rdc_conf_lock);
5409 	wakeup_busy(krdc);
5410 	mutex_exit(&rdc_conf_lock);
5411 
5412 	return (rc);
5413 
5414 notlogging:
5415 	/* no other changes possible unless logging */
5416 	mutex_enter(&rdc_conf_lock);
5417 	wakeup_busy(krdc);
5418 	mutex_exit(&rdc_conf_lock);
5419 	spcs_s_add(kstatus, RDC_ENOTLOGGING, urdc->primary.intf,
5420 	    urdc->primary.file, urdc->secondary.intf,
5421 	    urdc->secondary.file);
5422 	return (RDC_ENOTLOGGING);
5423 }
5424 
5425 static int
5426 rdc_reset(rdc_config_t *uparms, spcs_s_info_t kstatus)
5427 {
5428 	rdc_k_info_t *krdc;
5429 	rdc_u_info_t *urdc;
5430 	int rc = 0;
5431 	int index;
5432 	int cleared_error = 0;
5433 
5434 	mutex_enter(&rdc_conf_lock);
5435 	index = rdc_lookup_byname(uparms->rdc_set);
5436 	if (index >= 0)
5437 		krdc = &rdc_k_info[index];
5438 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
5439 		mutex_exit(&rdc_conf_lock);
5440 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5441 		    uparms->rdc_set->secondary.file);
5442 		return (RDC_EALREADY);
5443 	}
5444 
5445 	urdc = &rdc_u_info[index];
5446 	set_busy(krdc);
5447 	if (krdc->type_flag == 0) {
5448 		/* A resume or enable failed */
5449 		wakeup_busy(krdc);
5450 		mutex_exit(&rdc_conf_lock);
5451 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5452 		    uparms->rdc_set->secondary.file);
5453 		return (RDC_EALREADY);
5454 	}
5455 
5456 	mutex_exit(&rdc_conf_lock);
5457 
5458 	rdc_group_enter(krdc);
5459 	if (rdc_check(krdc, uparms->rdc_set)) {
5460 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5461 		    uparms->rdc_set->secondary.file);
5462 		rc = RDC_EALREADY;
5463 		goto done;
5464 	}
5465 
5466 	if ((rdc_get_vflags(urdc) & RDC_BMP_FAILED) && (krdc->bitmapfd)) {
5467 		if (rdc_reset_bitmap(krdc) == 0)
5468 			cleared_error++;
5469 	}
5470 
5471 	/* Fix direct file if necessary */
5472 	if ((rdc_get_vflags(urdc) & RDC_PRIMARY) && urdc->direct_file[0]) {
5473 		if (rdc_open_direct(krdc) == NULL)
5474 			rdc_set_flags(urdc, RDC_FCAL_FAILED);
5475 		else {
5476 			rdc_clr_flags(urdc, RDC_FCAL_FAILED);
5477 			cleared_error++;
5478 		}
5479 	}
5480 
5481 	if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
5482 		rdc_many_enter(krdc);
5483 		rdc_clr_flags(urdc, RDC_VOL_FAILED);
5484 		cleared_error++;
5485 		rdc_many_exit(krdc);
5486 	}
5487 
5488 	if (cleared_error) {
5489 		/* cleared an error so we should be in logging mode */
5490 		rdc_set_flags_log(urdc, RDC_LOGGING, "set reset");
5491 	}
5492 	rdc_group_exit(krdc);
5493 
5494 	if ((rdc_get_vflags(urdc) & RDC_DISKQ_FAILED))
5495 		rdc_unfail_diskq(krdc);
5496 
5497 done:
5498 	mutex_enter(&rdc_conf_lock);
5499 	wakeup_busy(krdc);
5500 	mutex_exit(&rdc_conf_lock);
5501 
5502 	return (rc);
5503 }
5504 
5505 
5506 static int
5507 rdc_tunable(rdc_config_t *uparms, spcs_s_info_t kstatus)
5508 {
5509 	rdc_k_info_t *krdc;
5510 	rdc_u_info_t *urdc;
5511 	rdc_k_info_t *p;
5512 	rdc_u_info_t *q;
5513 	int rc = 0;
5514 	int index;
5515 
5516 	mutex_enter(&rdc_conf_lock);
5517 	index = rdc_lookup_byname(uparms->rdc_set);
5518 	if (index >= 0)
5519 		krdc = &rdc_k_info[index];
5520 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
5521 		mutex_exit(&rdc_conf_lock);
5522 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5523 		    uparms->rdc_set->secondary.file);
5524 		return (RDC_EALREADY);
5525 	}
5526 
5527 	urdc = &rdc_u_info[index];
5528 	set_busy(krdc);
5529 	if (krdc->type_flag == 0) {
5530 		/* A resume or enable failed */
5531 		wakeup_busy(krdc);
5532 		mutex_exit(&rdc_conf_lock);
5533 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5534 		    uparms->rdc_set->secondary.file);
5535 		return (RDC_EALREADY);
5536 	}
5537 
5538 	mutex_exit(&rdc_conf_lock);
5539 
5540 	rdc_group_enter(krdc);
5541 	if (rdc_check(krdc, uparms->rdc_set)) {
5542 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5543 		    uparms->rdc_set->secondary.file);
5544 		rc = RDC_EALREADY;
5545 		goto done;
5546 	}
5547 
5548 	if (uparms->rdc_set->maxqfbas > 0) {
5549 		urdc->maxqfbas = uparms->rdc_set->maxqfbas;
5550 		rdc_write_state(urdc);
5551 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
5552 			q = &rdc_u_info[p->index];
5553 			q->maxqfbas = urdc->maxqfbas;
5554 			rdc_write_state(q);
5555 		}
5556 	}
5557 
5558 	if (uparms->rdc_set->maxqitems > 0) {
5559 		urdc->maxqitems = uparms->rdc_set->maxqitems;
5560 		rdc_write_state(urdc);
5561 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
5562 			q = &rdc_u_info[p->index];
5563 			q->maxqitems = urdc->maxqitems;
5564 			rdc_write_state(q);
5565 		}
5566 	}
5567 
5568 	if (uparms->options & RDC_OPT_SET_QNOBLOCK) {
5569 		disk_queue *que;
5570 
5571 		if (!RDC_IS_DISKQ(krdc->group)) {
5572 			spcs_s_add(kstatus, RDC_EQNOQUEUE, urdc->primary.intf,
5573 			    urdc->primary.file, urdc->secondary.intf,
5574 			    urdc->secondary.file);
5575 			rc = RDC_EQNOQUEUE;
5576 			goto done;
5577 		}
5578 
5579 		que = &krdc->group->diskq;
5580 		mutex_enter(QLOCK(que));
5581 		SET_QSTATE(que, RDC_QNOBLOCK);
5582 		/* queue will fail if this fails */
5583 		(void) rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED);
5584 		mutex_exit(QLOCK(que));
5585 
5586 	}
5587 
5588 	if (uparms->options & RDC_OPT_CLR_QNOBLOCK) {
5589 		disk_queue *que;
5590 
5591 		if (!RDC_IS_DISKQ(krdc->group)) {
5592 			spcs_s_add(kstatus, RDC_EQNOQUEUE, urdc->primary.intf,
5593 			    urdc->primary.file, urdc->secondary.intf,
5594 			    urdc->secondary.file);
5595 			rc = RDC_EQNOQUEUE;
5596 			goto done;
5597 		}
5598 		que = &krdc->group->diskq;
5599 		mutex_enter(QLOCK(que));
5600 		CLR_QSTATE(que, RDC_QNOBLOCK);
5601 		/* queue will fail if this fails */
5602 		(void) rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED);
5603 		mutex_exit(QLOCK(que));
5604 
5605 	}
5606 	if (uparms->rdc_set->asyncthr > 0) {
5607 		urdc->asyncthr = uparms->rdc_set->asyncthr;
5608 		rdc_write_state(urdc);
5609 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
5610 			q = &rdc_u_info[p->index];
5611 			q->asyncthr = urdc->asyncthr;
5612 			rdc_write_state(q);
5613 		}
5614 	}
5615 
5616 	if (uparms->rdc_set->autosync >= 0) {
5617 		if (uparms->rdc_set->autosync == 0)
5618 			urdc->autosync = 0;
5619 		else
5620 			urdc->autosync = 1;
5621 
5622 		rdc_write_state(urdc);
5623 
5624 		/* Changed autosync, so update rest of the group */
5625 
5626 		for (p = krdc->group_next; p != krdc; p = p->group_next) {
5627 			q = &rdc_u_info[p->index];
5628 			q->autosync = urdc->autosync;
5629 			rdc_write_state(q);
5630 		}
5631 	}
5632 
5633 done:
5634 	rdc_group_exit(krdc);
5635 
5636 	mutex_enter(&rdc_conf_lock);
5637 	wakeup_busy(krdc);
5638 	mutex_exit(&rdc_conf_lock);
5639 
5640 	return (rc);
5641 }
5642 
5643 static int
5644 rdc_status(void *arg, int mode, rdc_config_t *uparms, spcs_s_info_t kstatus)
5645 {
5646 	rdc_k_info_t *krdc;
5647 	rdc_u_info_t *urdc;
5648 	disk_queue *dqp;
5649 	int rc = 0;
5650 	int index;
5651 	char *ptr;
5652 	extern int rdc_status_copy32(const void *, void *, size_t, int);
5653 
5654 	mutex_enter(&rdc_conf_lock);
5655 	index = rdc_lookup_byname(uparms->rdc_set);
5656 	if (index >= 0)
5657 		krdc = &rdc_k_info[index];
5658 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
5659 		mutex_exit(&rdc_conf_lock);
5660 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5661 		    uparms->rdc_set->secondary.file);
5662 		return (RDC_EALREADY);
5663 	}
5664 
5665 	set_busy(krdc);
5666 	if (krdc->type_flag == 0) {
5667 		/* A resume or enable failed */
5668 		wakeup_busy(krdc);
5669 		mutex_exit(&rdc_conf_lock);
5670 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5671 		    uparms->rdc_set->secondary.file);
5672 		return (RDC_EALREADY);
5673 	}
5674 
5675 	mutex_exit(&rdc_conf_lock);
5676 
5677 	rdc_group_enter(krdc);
5678 	if (rdc_check(krdc, uparms->rdc_set)) {
5679 		rdc_group_exit(krdc);
5680 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
5681 		    uparms->rdc_set->secondary.file);
5682 		rc = RDC_EALREADY;
5683 		goto done;
5684 	}
5685 
5686 	urdc = &rdc_u_info[index];
5687 
5688 	/*
5689 	 * sneak out qstate in urdc->flags
5690 	 * this is harmless because it's value is not used
5691 	 * in urdc->flags. the real qstate is kept in
5692 	 * group->diskq->disk_hdr.h.state
5693 	 */
5694 	if (RDC_IS_DISKQ(krdc->group)) {
5695 		dqp = &krdc->group->diskq;
5696 		if (IS_QSTATE(dqp, RDC_QNOBLOCK))
5697 		urdc->flags |= RDC_QNOBLOCK;
5698 	}
5699 
5700 	if (ddi_model_convert_from(mode & FMODELS) == DDI_MODEL_ILP32) {
5701 		ptr = (char *)arg + offsetof(struct rdc_config32, rdc_set);
5702 		rc = rdc_status_copy32(urdc, ptr, sizeof (struct rdc_set32),
5703 		    mode);
5704 	} else {
5705 		ptr = (char *)arg + offsetof(struct rdc_config, rdc_set);
5706 		rc = ddi_copyout(urdc, ptr, sizeof (struct rdc_set), mode);
5707 	}
5708 	/* clear out qstate from flags */
5709 	urdc->flags &= ~RDC_QNOBLOCK;
5710 
5711 	if (rc)
5712 		rc = EFAULT;
5713 
5714 	rdc_group_exit(krdc);
5715 done:
5716 	mutex_enter(&rdc_conf_lock);
5717 	wakeup_busy(krdc);
5718 	mutex_exit(&rdc_conf_lock);
5719 
5720 	return (rc);
5721 }
5722 
5723 /*
5724  * Overwrite the bitmap with one supplied by the
5725  * user.
5726  * Copy into all bitmaps that are tracking this volume.
5727  */
5728 
5729 int
5730 rdc_bitmapset(int op, char *sechost, char *secdev, void *bmapaddr, int bmapsz,
5731     nsc_off_t off, int mode)
5732 {
5733 	int rc;
5734 	rdc_k_info_t *krdc;
5735 	int *indexvec;
5736 	int index;
5737 	int indexit;
5738 	kmutex_t **grouplocks;
5739 	int i;
5740 	int groupind;
5741 
5742 	if (off % FBA_SIZE(1)) {
5743 		/* Must be modulo FBA */
5744 		cmn_err(CE_WARN, "!bitmapset: Offset is not on an FBA "
5745 		    "boundary %llu", (unsigned long long)off);
5746 		return (EINVAL);
5747 	}
5748 	if (bmapsz % FBA_SIZE(1)) {
5749 		/* Must be modulo FBA */
5750 		cmn_err(CE_WARN, "!bitmapset: Size is not on an FBA "
5751 		    "boundary %d", bmapsz);
5752 		return (EINVAL);
5753 	}
5754 
5755 	mutex_enter(&rdc_conf_lock);
5756 	index = rdc_lookup_byhostdev(sechost, secdev);
5757 	if (index >= 0) {
5758 		krdc = &rdc_k_info[index];
5759 	}
5760 	if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
5761 		rc = ENODEV;
5762 		mutex_exit(&rdc_conf_lock);
5763 		return (rc);
5764 	}
5765 	indexvec = kmem_alloc(rdc_max_sets * sizeof (int), KM_SLEEP);
5766 	grouplocks = kmem_alloc(rdc_max_sets * sizeof (kmutex_t *), KM_SLEEP);
5767 
5768 	/*
5769 	 * I now have this set, and I want to take the group
5770 	 * lock on it, and all the group locks of all the
5771 	 * sets on the many and multi-hop links.
5772 	 * I have to take the many lock while traversing the
5773 	 * many/multi links.
5774 	 * I think I also need to set the busy count on this
5775 	 * set, otherwise when I drop the conf_lock, what
5776 	 * will stop some other process from coming in and
5777 	 * issuing a disable?
5778 	 */
5779 	set_busy(krdc);
5780 	mutex_exit(&rdc_conf_lock);
5781 
5782 retrylock:
5783 	groupind = 0;
5784 	indexit = 0;
5785 	rdc_many_enter(krdc);
5786 	/*
5787 	 * Take this initial sets group lock first.
5788 	 */
5789 	if (!mutex_tryenter(&krdc->group->lock)) {
5790 		rdc_many_exit(krdc);
5791 		goto retrylock;
5792 	}
5793 
5794 	grouplocks[groupind] = &krdc->group->lock;
5795 	groupind++;
5796 
5797 	rc = rdc_checkforbitmap(index, off + bmapsz);
5798 	if (rc) {
5799 		goto done;
5800 	}
5801 	indexvec[indexit] = index;
5802 	indexit++;
5803 	if (IS_MANY(krdc)) {
5804 		rdc_k_info_t *ktmp;
5805 
5806 		for (ktmp = krdc->many_next; ktmp != krdc;
5807 		    ktmp =  ktmp->many_next) {
5808 			/*
5809 			 * attempt to take the group lock,
5810 			 * if we don't already have it.
5811 			 */
5812 			if (ktmp->group == NULL) {
5813 				rc = ENODEV;
5814 				goto done;
5815 			}
5816 			for (i = 0; i < groupind; i++) {
5817 				if (grouplocks[i] == &ktmp->group->lock)
5818 					/* already have the group lock */
5819 					break;
5820 			}
5821 			/*
5822 			 * didn't find our lock in our collection,
5823 			 * attempt to take group lock.
5824 			 */
5825 			if (i >= groupind) {
5826 				if (!mutex_tryenter(&ktmp->group->lock)) {
5827 					for (i = 0; i < groupind; i++) {
5828 						mutex_exit(grouplocks[i]);
5829 					}
5830 					rdc_many_exit(krdc);
5831 					goto retrylock;
5832 				}
5833 				grouplocks[groupind] = &ktmp->group->lock;
5834 				groupind++;
5835 			}
5836 			rc = rdc_checkforbitmap(ktmp->index, off + bmapsz);
5837 			if (rc == 0) {
5838 				indexvec[indexit] = ktmp->index;
5839 				indexit++;
5840 			} else {
5841 				goto done;
5842 			}
5843 		}
5844 	}
5845 	if (IS_MULTI(krdc)) {
5846 		rdc_k_info_t *kmulti = krdc->multi_next;
5847 
5848 		if (kmulti->group == NULL) {
5849 			rc = ENODEV;
5850 			goto done;
5851 		}
5852 		/*
5853 		 * This can't be in our group already.
5854 		 */
5855 		if (!mutex_tryenter(&kmulti->group->lock)) {
5856 			for (i = 0; i < groupind; i++) {
5857 				mutex_exit(grouplocks[i]);
5858 			}
5859 			rdc_many_exit(krdc);
5860 			goto retrylock;
5861 		}
5862 		grouplocks[groupind] = &kmulti->group->lock;
5863 		groupind++;
5864 
5865 		rc = rdc_checkforbitmap(kmulti->index, off + bmapsz);
5866 		if (rc == 0) {
5867 			indexvec[indexit] = kmulti->index;
5868 			indexit++;
5869 		} else {
5870 			goto done;
5871 		}
5872 	}
5873 	rc = rdc_installbitmap(op, bmapaddr, bmapsz, off, mode, indexvec,
5874 	    indexit);
5875 done:
5876 	for (i = 0; i < groupind; i++) {
5877 		mutex_exit(grouplocks[i]);
5878 	}
5879 	rdc_many_exit(krdc);
5880 	mutex_enter(&rdc_conf_lock);
5881 	wakeup_busy(krdc);
5882 	mutex_exit(&rdc_conf_lock);
5883 	kmem_free(indexvec, rdc_max_sets * sizeof (int));
5884 	kmem_free(grouplocks, rdc_max_sets * sizeof (kmutex_t *));
5885 	return (rc);
5886 }
5887 
5888 static int
5889 rdc_checkforbitmap(int index, nsc_off_t limit)
5890 {
5891 	rdc_k_info_t *krdc;
5892 	rdc_u_info_t *urdc;
5893 
5894 	krdc = &rdc_k_info[index];
5895 	urdc = &rdc_u_info[index];
5896 
5897 	if (!IS_ENABLED(urdc)) {
5898 		return (EIO);
5899 	}
5900 	if (!(rdc_get_vflags(urdc) & RDC_LOGGING)) {
5901 		return (ENXIO);
5902 	}
5903 	if (krdc->dcio_bitmap == NULL) {
5904 		cmn_err(CE_WARN, "!checkforbitmap: No bitmap for set (%s:%s)",
5905 		    urdc->secondary.intf, urdc->secondary.file);
5906 		return (ENOENT);
5907 	}
5908 	if (limit > krdc->bitmap_size) {
5909 		cmn_err(CE_WARN, "!checkbitmap: Bitmap exceeded, "
5910 		    "incore %" NSC_SZFMT " user supplied %" NSC_SZFMT
5911 		    " for set (%s:%s)", krdc->bitmap_size,
5912 		    limit, urdc->secondary.intf, urdc->secondary.file);
5913 		return (ENOSPC);
5914 	}
5915 	return (0);
5916 }
5917 
5918 
5919 
5920 /*
5921  * Copy the user supplied bitmap to this set.
5922  */
5923 static int
5924 rdc_installbitmap(int op, void *bmapaddr, int bmapsz,
5925     nsc_off_t off, int mode, int *vec, int veccnt)
5926 {
5927 	int rc;
5928 	nsc_off_t sfba;
5929 	nsc_off_t efba;
5930 	nsc_off_t fba;
5931 	void *ormem = NULL;
5932 	int len;
5933 	int left;
5934 	int copied;
5935 	int index;
5936 	rdc_k_info_t *krdc;
5937 	rdc_u_info_t *urdc;
5938 
5939 	rc = 0;
5940 	ormem = kmem_alloc(RDC_MAXDATA, KM_SLEEP);
5941 	left = bmapsz;
5942 	copied = 0;
5943 	while (left > 0) {
5944 		if (left > RDC_MAXDATA) {
5945 			len = RDC_MAXDATA;
5946 		} else {
5947 			len = left;
5948 		}
5949 		if (ddi_copyin((char *)bmapaddr + copied, ormem,
5950 		    len, mode)) {
5951 			cmn_err(CE_WARN, "!installbitmap: Copyin failed");
5952 			rc = EFAULT;
5953 			goto out;
5954 		}
5955 		sfba = FBA_NUM(off + copied);
5956 		efba = FBA_NUM(off + copied + len);
5957 		for (index = 0; index < veccnt; index++) {
5958 			krdc = &rdc_k_info[vec[index]];
5959 			urdc = &rdc_u_info[vec[index]];
5960 
5961 			mutex_enter(&krdc->bmapmutex);
5962 			if (op == RDC_BITMAPSET) {
5963 				bcopy(ormem, krdc->dcio_bitmap + off + copied,
5964 				    len);
5965 			} else {
5966 				rdc_lor(ormem,
5967 				    krdc->dcio_bitmap + off + copied, len);
5968 			}
5969 			/*
5970 			 * Maybe this should be just done once outside of
5971 			 * the the loop? (Less work, but leaves a window
5972 			 * where the bits_set doesn't match the bitmap).
5973 			 */
5974 			urdc->bits_set = RDC_COUNT_BITMAP(krdc);
5975 			mutex_exit(&krdc->bmapmutex);
5976 			if (krdc->bitmap_write > 0) {
5977 				for (fba = sfba; fba < efba; fba++) {
5978 					if (rc = rdc_write_bitmap_fba(krdc,
5979 					    fba)) {
5980 
5981 						cmn_err(CE_WARN,
5982 						    "!installbitmap: "
5983 						    "write_bitmap_fba failed "
5984 						    "on fba number %" NSC_SZFMT
5985 						    " set %s:%s", fba,
5986 						    urdc->secondary.intf,
5987 						    urdc->secondary.file);
5988 						goto out;
5989 					}
5990 				}
5991 			}
5992 		}
5993 		copied += len;
5994 		left -= len;
5995 	}
5996 out:
5997 	kmem_free(ormem, RDC_MAXDATA);
5998 	return (rc);
5999 }
6000 
6001 /*
6002  * _rdc_config
6003  */
6004 int
6005 _rdc_config(void *arg, int mode, spcs_s_info_t kstatus, int *rvp)
6006 {
6007 	int rc = 0;
6008 	struct netbuf fsvaddr, tsvaddr;
6009 	struct knetconfig *knconf;
6010 	char *p = NULL, *pf = NULL;
6011 	struct rdc_config *uap;
6012 	STRUCT_DECL(knetconfig, knconf_tmp);
6013 	STRUCT_DECL(rdc_config, uparms);
6014 	int enable, disable;
6015 	int cmd;
6016 
6017 
6018 	STRUCT_HANDLE(rdc_set, rs);
6019 	STRUCT_HANDLE(rdc_addr, pa);
6020 	STRUCT_HANDLE(rdc_addr, sa);
6021 
6022 	STRUCT_INIT(uparms, mode);
6023 
6024 	bzero(STRUCT_BUF(uparms), STRUCT_SIZE(uparms));
6025 	bzero(&fsvaddr, sizeof (fsvaddr));
6026 	bzero(&tsvaddr, sizeof (tsvaddr));
6027 
6028 	knconf = NULL;
6029 
6030 	if (ddi_copyin(arg, STRUCT_BUF(uparms), STRUCT_SIZE(uparms), mode)) {
6031 		return (EFAULT);
6032 	}
6033 
6034 	STRUCT_SET_HANDLE(rs, mode, STRUCT_FGETP(uparms, rdc_set));
6035 	STRUCT_SET_HANDLE(pa, mode, STRUCT_FADDR(rs, primary));
6036 	STRUCT_SET_HANDLE(sa, mode, STRUCT_FADDR(rs, secondary));
6037 	cmd = STRUCT_FGET(uparms, command);
6038 	if (cmd == RDC_CMD_ENABLE || cmd == RDC_CMD_RESUME) {
6039 		fsvaddr.len = STRUCT_FGET(pa, addr.len);
6040 		fsvaddr.maxlen = STRUCT_FGET(pa, addr.maxlen);
6041 		fsvaddr.buf =  kmem_zalloc(fsvaddr.len, KM_SLEEP);
6042 
6043 		if (ddi_copyin(STRUCT_FGETP(pa, addr.buf),
6044 		    fsvaddr.buf, fsvaddr.len, mode)) {
6045 			kmem_free(fsvaddr.buf, fsvaddr.len);
6046 #ifdef DEBUG
6047 			cmn_err(CE_WARN, "!copyin failed primary.addr 2");
6048 #endif
6049 			return (EFAULT);
6050 		}
6051 
6052 
6053 		tsvaddr.len = STRUCT_FGET(sa, addr.len);
6054 		tsvaddr.maxlen = STRUCT_FGET(sa, addr.maxlen);
6055 		tsvaddr.buf =  kmem_zalloc(tsvaddr.len, KM_SLEEP);
6056 
6057 		if (ddi_copyin(STRUCT_FGETP(sa, addr.buf),
6058 		    tsvaddr.buf, tsvaddr.len, mode)) {
6059 #ifdef DEBUG
6060 			cmn_err(CE_WARN, "!copyin failed secondary addr");
6061 #endif
6062 			kmem_free(fsvaddr.buf, fsvaddr.len);
6063 			kmem_free(tsvaddr.buf, tsvaddr.len);
6064 			return (EFAULT);
6065 		}
6066 	} else {
6067 		fsvaddr.len = 0;
6068 		fsvaddr.maxlen = 0;
6069 		fsvaddr.buf =  kmem_zalloc(fsvaddr.len, KM_SLEEP);
6070 		tsvaddr.len = 0;
6071 		tsvaddr.maxlen = 0;
6072 		tsvaddr.buf =  kmem_zalloc(tsvaddr.len, KM_SLEEP);
6073 	}
6074 
6075 	if (STRUCT_FGETP(uparms, rdc_set->netconfig) != NULL) {
6076 		STRUCT_INIT(knconf_tmp, mode);
6077 		knconf = kmem_zalloc(sizeof (*knconf), KM_SLEEP);
6078 		if (ddi_copyin(STRUCT_FGETP(uparms, rdc_set->netconfig),
6079 		    STRUCT_BUF(knconf_tmp), STRUCT_SIZE(knconf_tmp), mode)) {
6080 #ifdef DEBUG
6081 			cmn_err(CE_WARN, "!copyin failed netconfig");
6082 #endif
6083 			kmem_free(fsvaddr.buf, fsvaddr.len);
6084 			kmem_free(tsvaddr.buf, tsvaddr.len);
6085 			kmem_free(knconf, sizeof (*knconf));
6086 			return (EFAULT);
6087 		}
6088 
6089 		knconf->knc_semantics = STRUCT_FGET(knconf_tmp, knc_semantics);
6090 		knconf->knc_protofmly = STRUCT_FGETP(knconf_tmp, knc_protofmly);
6091 		knconf->knc_proto = STRUCT_FGETP(knconf_tmp, knc_proto);
6092 
6093 #ifndef _SunOS_5_6
6094 		if ((mode & DATAMODEL_LP64) == 0) {
6095 			knconf->knc_rdev =
6096 			    expldev(STRUCT_FGET(knconf_tmp, knc_rdev));
6097 		} else {
6098 #endif
6099 			knconf->knc_rdev = STRUCT_FGET(knconf_tmp, knc_rdev);
6100 #ifndef _SunOS_5_6
6101 		}
6102 #endif
6103 
6104 		pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
6105 		p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
6106 		rc = ddi_copyin(knconf->knc_protofmly, pf, KNC_STRSIZE, mode);
6107 		if (rc) {
6108 #ifdef DEBUG
6109 			cmn_err(CE_WARN, "!copyin failed parms protofmly");
6110 #endif
6111 			rc = EFAULT;
6112 			goto out;
6113 		}
6114 		rc = ddi_copyin(knconf->knc_proto, p, KNC_STRSIZE, mode);
6115 		if (rc) {
6116 #ifdef DEBUG
6117 			cmn_err(CE_WARN, "!copyin failed parms proto");
6118 #endif
6119 			rc = EFAULT;
6120 			goto out;
6121 		}
6122 		knconf->knc_protofmly = pf;
6123 		knconf->knc_proto = p;
6124 	} /* !NULL netconfig */
6125 
6126 	uap = kmem_alloc(sizeof (*uap), KM_SLEEP);
6127 
6128 	/* copy relevant parts of rdc_config to uap field by field */
6129 
6130 	(void) strncpy(uap->rdc_set[0].primary.intf, STRUCT_FGETP(pa, intf),
6131 	    MAX_RDC_HOST_SIZE);
6132 	(void) strncpy(uap->rdc_set[0].primary.file, STRUCT_FGETP(pa, file),
6133 	    NSC_MAXPATH);
6134 	(void) strncpy(uap->rdc_set[0].primary.bitmap, STRUCT_FGETP(pa, bitmap),
6135 	    NSC_MAXPATH);
6136 	uap->rdc_set[0].netconfig = knconf;
6137 	uap->rdc_set[0].flags = STRUCT_FGET(uparms, rdc_set->flags);
6138 	uap->rdc_set[0].index = STRUCT_FGET(uparms, rdc_set->index);
6139 	uap->rdc_set[0].setid = STRUCT_FGET(uparms, rdc_set->setid);
6140 	uap->rdc_set[0].sync_pos = STRUCT_FGET(uparms, rdc_set->sync_pos);
6141 	uap->rdc_set[0].volume_size = STRUCT_FGET(uparms, rdc_set->volume_size);
6142 	uap->rdc_set[0].bits_set = STRUCT_FGET(uparms, rdc_set->bits_set);
6143 	uap->rdc_set[0].autosync = STRUCT_FGET(uparms, rdc_set->autosync);
6144 	uap->rdc_set[0].maxqfbas = STRUCT_FGET(uparms, rdc_set->maxqfbas);
6145 	uap->rdc_set[0].maxqitems = STRUCT_FGET(uparms, rdc_set->maxqitems);
6146 	uap->rdc_set[0].asyncthr = STRUCT_FGET(uparms, rdc_set->asyncthr);
6147 	uap->rdc_set[0].syshostid = STRUCT_FGET(uparms, rdc_set->syshostid);
6148 	uap->rdc_set[0].primary.addr = fsvaddr;		/* struct copy */
6149 	uap->rdc_set[0].secondary.addr = tsvaddr;	/* struct copy */
6150 
6151 	(void) strncpy(uap->rdc_set[0].secondary.intf, STRUCT_FGETP(sa, intf),
6152 	    MAX_RDC_HOST_SIZE);
6153 	(void) strncpy(uap->rdc_set[0].secondary.file, STRUCT_FGETP(sa, file),
6154 	    NSC_MAXPATH);
6155 	(void) strncpy(uap->rdc_set[0].secondary.bitmap,
6156 	    STRUCT_FGETP(sa, bitmap), NSC_MAXPATH);
6157 
6158 	(void) strncpy(uap->rdc_set[0].direct_file,
6159 	    STRUCT_FGETP(rs, direct_file), NSC_MAXPATH);
6160 
6161 	(void) strncpy(uap->rdc_set[0].group_name, STRUCT_FGETP(rs, group_name),
6162 	    NSC_MAXPATH);
6163 
6164 	(void) strncpy(uap->rdc_set[0].disk_queue, STRUCT_FGETP(rs, disk_queue),
6165 	    NSC_MAXPATH);
6166 
6167 	uap->command = STRUCT_FGET(uparms, command);
6168 	uap->options = STRUCT_FGET(uparms, options);
6169 
6170 	enable = (uap->command == RDC_CMD_ENABLE ||
6171 	    uap->command == RDC_CMD_RESUME);
6172 	disable = (uap->command == RDC_CMD_DISABLE ||
6173 	    uap->command == RDC_CMD_SUSPEND);
6174 
6175 	/*
6176 	 * Initialise the threadset if it has not already been done.
6177 	 *
6178 	 * This has to be done now, not in rdcattach(), because
6179 	 * rdcattach() can be called before nskernd is running (eg.
6180 	 * boot -r) in which case the nst_init() would fail and hence
6181 	 * the attach would fail.
6182 	 *
6183 	 * Threadset creation is locked by the rdc_conf_lock,
6184 	 * destruction is inherently single threaded as it is done in
6185 	 * _rdc_unload() which must be the last thing performed by
6186 	 * rdcdetach().
6187 	 */
6188 
6189 	if (enable && _rdc_ioset == NULL) {
6190 		mutex_enter(&rdc_conf_lock);
6191 
6192 		if (_rdc_ioset == NULL) {
6193 			rc = rdc_thread_configure();
6194 		}
6195 
6196 		mutex_exit(&rdc_conf_lock);
6197 
6198 		if (rc || _rdc_ioset == NULL) {
6199 			spcs_s_add(kstatus, RDC_ENOTHREADS);
6200 			rc = RDC_ENOTHREADS;
6201 			goto outuap;
6202 		}
6203 	}
6204 	switch (uap->command) {
6205 	case RDC_CMD_ENABLE:
6206 		rc = rdc_enable(uap, kstatus);
6207 		break;
6208 	case RDC_CMD_DISABLE:
6209 		rc = rdc_disable(uap, kstatus);
6210 		break;
6211 	case RDC_CMD_COPY:
6212 		rc = rdc_sync(uap, kstatus);
6213 		break;
6214 	case RDC_CMD_LOG:
6215 		rc = rdc_log(uap, kstatus);
6216 		break;
6217 	case RDC_CMD_RECONFIG:
6218 		rc = rdc_reconfig(uap, kstatus);
6219 		break;
6220 	case RDC_CMD_RESUME:
6221 		rc = rdc_resume(uap, kstatus);
6222 		break;
6223 	case RDC_CMD_SUSPEND:
6224 		rc = rdc_suspend(uap, kstatus);
6225 		break;
6226 	case RDC_CMD_TUNABLE:
6227 		rc = rdc_tunable(uap, kstatus);
6228 		break;
6229 	case RDC_CMD_WAIT:
6230 		rc = rdc_wait(uap, kstatus);
6231 		break;
6232 	case RDC_CMD_HEALTH:
6233 		rc = rdc_health(uap, kstatus, rvp);
6234 		break;
6235 	case RDC_CMD_STATUS:
6236 		rc = rdc_status(arg, mode, uap, kstatus);
6237 		break;
6238 	case RDC_CMD_RESET:
6239 		rc = rdc_reset(uap, kstatus);
6240 		break;
6241 	case RDC_CMD_ADDQ:
6242 		rc = rdc_add_diskq(uap, kstatus);
6243 		break;
6244 	case RDC_CMD_REMQ:
6245 		if ((rc = rdc_rem_diskq(uap, kstatus)) != 0)
6246 			break;
6247 		/* FALLTHRU */
6248 	case RDC_CMD_KILLQ:
6249 		rc = rdc_kill_diskq(uap, kstatus);
6250 		break;
6251 	case RDC_CMD_INITQ:
6252 		rc = rdc_init_diskq(uap, kstatus);
6253 		break;
6254 
6255 	default:
6256 		rc = EINVAL;
6257 		break;
6258 	}
6259 
6260 	/*
6261 	 * Tune the threadset size after a successful rdc_set addition
6262 	 * or removal.
6263 	 */
6264 	if ((enable || disable) && rc == 0) {
6265 		mutex_enter(&rdc_conf_lock);
6266 		rdc_thread_tune(enable ? 2 : -2);
6267 		mutex_exit(&rdc_conf_lock);
6268 	}
6269 outuap:
6270 	kmem_free(uap, sizeof (*uap));
6271 out:
6272 	kmem_free(fsvaddr.buf, fsvaddr.len);
6273 	kmem_free(tsvaddr.buf, tsvaddr.len);
6274 	if (pf)
6275 		kmem_free(pf, KNC_STRSIZE);
6276 	if (p)
6277 		kmem_free(p, KNC_STRSIZE);
6278 	if (knconf)
6279 		kmem_free(knconf, sizeof (*knconf));
6280 	return (rc);
6281 }
6282 
6283 
6284 /*
6285  * krdc->group->lock held on entry to halt_sync()
6286  */
6287 static void
6288 halt_sync(rdc_k_info_t *krdc)
6289 {
6290 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
6291 
6292 	ASSERT(MUTEX_HELD(&krdc->group->lock));
6293 	ASSERT(IS_ENABLED(urdc));
6294 
6295 	/*
6296 	 * If a sync is in progress, halt it
6297 	 */
6298 	if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
6299 	    (krdc->aux_state & RDC_AUXSYNCIP)) {
6300 		krdc->disk_status = 1;
6301 
6302 		while (krdc->disk_status == 1) {
6303 			if (cv_wait_sig(&krdc->haltcv, &krdc->group->lock) == 0)
6304 				break;
6305 		}
6306 	}
6307 }
6308 
6309 /*
6310  * return size in blocks
6311  */
6312 uint64_t
6313 mirror_getsize(int index)
6314 {
6315 	rdc_k_info_t *krdc;
6316 	rdc_u_info_t *urdc;
6317 	int rc, rs;
6318 	nsc_size_t size;
6319 
6320 	krdc = &rdc_k_info[index];
6321 	urdc = &rdc_u_info[index];
6322 
6323 	rc = _rdc_rsrv_devs(krdc, RDC_RAW, RDC_INTERNAL);
6324 	rs = nsc_partsize(RDC_U_FD(krdc), &size);
6325 	urdc->volume_size = size;
6326 	if (rc == 0)
6327 		_rdc_rlse_devs(krdc, RDC_RAW);
6328 
6329 	return (rs == 0 ? urdc->volume_size : 0);
6330 }
6331 
6332 
6333 /*
6334  * Create a new dataset for this transfer, and add it to the list
6335  * of datasets via the net_dataset pointer in the krdc.
6336  */
6337 rdc_net_dataset_t *
6338 rdc_net_add_set(int index)
6339 {
6340 	rdc_k_info_t *krdc;
6341 	rdc_u_info_t *urdc;
6342 	rdc_net_dataset_t *dset;
6343 
6344 	if (index >= rdc_max_sets) {
6345 		cmn_err(CE_NOTE, "!rdc_net_add_set: bad index %d", index);
6346 		return (NULL);
6347 	}
6348 	krdc = &rdc_k_info[index];
6349 	urdc = &rdc_u_info[index];
6350 
6351 	dset = kmem_alloc(sizeof (*dset), KM_NOSLEEP);
6352 	if (dset == NULL) {
6353 		cmn_err(CE_NOTE, "!rdc_net_add_set: kmem_alloc failed");
6354 		return (NULL);
6355 	}
6356 	RDC_DSMEMUSE(sizeof (*dset));
6357 	dset->inuse = 1;
6358 	dset->nitems = 0;
6359 	dset->delpend = 0;
6360 	dset->head = NULL;
6361 	dset->tail = NULL;
6362 	mutex_enter(&krdc->dc_sleep);
6363 
6364 	if (!IS_ENABLED(urdc)) {
6365 		/* raced with a disable command */
6366 		kmem_free(dset, sizeof (*dset));
6367 		RDC_DSMEMUSE(-sizeof (*dset));
6368 		mutex_exit(&krdc->dc_sleep);
6369 		return (NULL);
6370 	}
6371 	/*
6372 	 * Shared the id generator, (and the locks).
6373 	 */
6374 	mutex_enter(&rdc_net_hnd_id_lock);
6375 	if (++rdc_net_hnd_id == 0)
6376 		rdc_net_hnd_id = 1;
6377 	dset->id = rdc_net_hnd_id;
6378 	mutex_exit(&rdc_net_hnd_id_lock);
6379 
6380 #ifdef DEBUG
6381 	if (krdc->net_dataset != NULL) {
6382 		rdc_net_dataset_t *dset2;
6383 		for (dset2 = krdc->net_dataset; dset2; dset2 = dset2->next) {
6384 			if (dset2->id == dset->id) {
6385 				cmn_err(CE_PANIC,
6386 				    "rdc_net_add_set duplicate id %p:%d %p:%d",
6387 				    (void *)dset, dset->id,
6388 				    (void *)dset2, dset2->id);
6389 			}
6390 		}
6391 	}
6392 #endif
6393 	dset->next = krdc->net_dataset;
6394 	krdc->net_dataset = dset;
6395 	mutex_exit(&krdc->dc_sleep);
6396 
6397 	return (dset);
6398 }
6399 
6400 /*
6401  * fetch the previously added dataset.
6402  */
6403 rdc_net_dataset_t *
6404 rdc_net_get_set(int index, int id)
6405 {
6406 	rdc_k_info_t *krdc;
6407 	rdc_net_dataset_t *dset;
6408 
6409 	if (index >= rdc_max_sets) {
6410 		cmn_err(CE_NOTE, "!rdc_net_get_set: bad index %d", index);
6411 		return (NULL);
6412 	}
6413 	krdc = &rdc_k_info[index];
6414 
6415 	mutex_enter(&krdc->dc_sleep);
6416 
6417 	dset = krdc->net_dataset;
6418 	while (dset && (dset->id != id))
6419 		dset = dset->next;
6420 
6421 	if (dset) {
6422 		dset->inuse++;
6423 	}
6424 
6425 	mutex_exit(&krdc->dc_sleep);
6426 	return (dset);
6427 }
6428 
6429 /*
6430  * Decrement the inuse counter. Data may be freed.
6431  */
6432 void
6433 rdc_net_put_set(int index, rdc_net_dataset_t *dset)
6434 {
6435 	rdc_k_info_t *krdc;
6436 
6437 	if (index >= rdc_max_sets) {
6438 		cmn_err(CE_NOTE, "!rdc_net_put_set: bad index %d", index);
6439 		return;
6440 	}
6441 	krdc = &rdc_k_info[index];
6442 
6443 	mutex_enter(&krdc->dc_sleep);
6444 	dset->inuse--;
6445 	ASSERT(dset->inuse >= 0);
6446 	if ((dset->inuse == 0) && (dset->delpend)) {
6447 		rdc_net_free_set(krdc, dset);
6448 	}
6449 	mutex_exit(&krdc->dc_sleep);
6450 }
6451 
6452 /*
6453  * Mark that we are finished with this set. Decrement inuse
6454  * counter, mark as needing deletion, and
6455  * remove from linked list.
6456  */
6457 void
6458 rdc_net_del_set(int index, rdc_net_dataset_t *dset)
6459 {
6460 	rdc_k_info_t *krdc;
6461 
6462 	if (index >= rdc_max_sets) {
6463 		cmn_err(CE_NOTE, "!rdc_net_del_set: bad index %d", index);
6464 		return;
6465 	}
6466 	krdc = &rdc_k_info[index];
6467 
6468 	mutex_enter(&krdc->dc_sleep);
6469 	dset->inuse--;
6470 	ASSERT(dset->inuse >= 0);
6471 	dset->delpend = 1;
6472 	if (dset->inuse == 0) {
6473 		rdc_net_free_set(krdc, dset);
6474 	}
6475 	mutex_exit(&krdc->dc_sleep);
6476 }
6477 
6478 /*
6479  * free all the memory associated with this set, and remove from
6480  * list.
6481  * Enters and exits with dc_sleep lock held.
6482  */
6483 
6484 void
6485 rdc_net_free_set(rdc_k_info_t *krdc, rdc_net_dataset_t *dset)
6486 {
6487 	rdc_net_dataset_t **dsetp;
6488 #ifdef DEBUG
6489 	int found = 0;
6490 #endif
6491 
6492 	ASSERT(MUTEX_HELD(&krdc->dc_sleep));
6493 	ASSERT(dset);
6494 	for (dsetp = &krdc->net_dataset; *dsetp; dsetp = &((*dsetp)->next)) {
6495 		if (*dsetp == dset) {
6496 			*dsetp = dset->next;
6497 #ifdef DEBUG
6498 			found = 1;
6499 #endif
6500 			break;
6501 		}
6502 	}
6503 
6504 #ifdef DEBUG
6505 	if (found == 0) {
6506 		cmn_err(CE_WARN, "!rdc_net_free_set: Unable to find "
6507 		    "dataset 0x%p in krdc list", (void *)dset);
6508 	}
6509 #endif
6510 	/*
6511 	 * unlinked from list. Free all the data
6512 	 */
6513 	rdc_ditemsfree(dset);
6514 	/*
6515 	 * free my core.
6516 	 */
6517 	kmem_free(dset, sizeof (*dset));
6518 	RDC_DSMEMUSE(-sizeof (*dset));
6519 }
6520 
6521 
6522 /*
6523  * Free all the dataitems and the data it points to.
6524  */
6525 static void
6526 rdc_ditemsfree(rdc_net_dataset_t *dset)
6527 {
6528 	rdc_net_dataitem_t *ditem;
6529 	rdc_net_dataitem_t *nitem;
6530 
6531 	ditem = dset->head;
6532 
6533 	while (ditem) {
6534 		nitem = ditem->next;
6535 		kmem_free(ditem->dptr, ditem->mlen);
6536 		RDC_DSMEMUSE(-ditem->mlen);
6537 		dset->nitems--;
6538 		kmem_free(ditem, sizeof (*ditem));
6539 		RDC_DSMEMUSE(-sizeof (*ditem));
6540 		ditem = nitem;
6541 	}
6542 	ASSERT(dset->nitems == 0);
6543 }
6544 
6545 /*
6546  * allocate and initialize a rdc_aio_t
6547  */
6548 rdc_aio_t *
6549 rdc_aio_tbuf_get(void *n, void *h, int pos, int len, int flag, int index, int s)
6550 {
6551 	rdc_aio_t *p;
6552 
6553 	p = kmem_zalloc(sizeof (rdc_aio_t), KM_NOSLEEP);
6554 	if (p == NULL) {
6555 #ifdef DEBUG
6556 		cmn_err(CE_NOTE, "!_rdcaiotbufget: kmem_alloc failed bp aio");
6557 #endif
6558 		return (NULL);
6559 	} else {
6560 		p->next = n; /* overload */
6561 		p->handle = h;
6562 		p->pos = pos;
6563 		p->qpos = -1;
6564 		p->len = len;
6565 		p->flag = flag;
6566 		p->index = index;
6567 		p->iostatus = s; /* overload */
6568 		/* set up seq later, in case thr create fails */
6569 	}
6570 	return (p);
6571 }
6572 
6573 /*
6574  * rdc_aio_buf_get
6575  * get an aio_buf
6576  */
6577 aio_buf_t *
6578 rdc_aio_buf_get(rdc_buf_t *h, int index)
6579 {
6580 	aio_buf_t *p;
6581 
6582 	if (index >= rdc_max_sets) {
6583 		cmn_err(CE_NOTE, "!rdc: rdc_aio_buf_get bad index %x", index);
6584 		return (NULL);
6585 	}
6586 
6587 	mutex_enter(&h->aio_lock);
6588 
6589 	p = h->rdc_anon;
6590 	while (p && (p->kindex != index))
6591 		p = p->next;
6592 
6593 	mutex_exit(&h->aio_lock);
6594 	return (p);
6595 }
6596 
6597 /*
6598  * rdc_aio_buf_del
6599  * delete a aio_buf
6600  */
6601 void
6602 rdc_aio_buf_del(rdc_buf_t *h, rdc_k_info_t *krdc)
6603 {
6604 	aio_buf_t *p, **pp;
6605 
6606 	mutex_enter(&h->aio_lock);
6607 
6608 	p = NULL;
6609 	for (pp = &h->rdc_anon; *pp; pp = &((*pp)->next)) {
6610 		if ((*pp)->kindex == krdc->index) {
6611 			p = *pp;
6612 			break;
6613 		}
6614 	}
6615 
6616 	if (p) {
6617 		*pp = p->next;
6618 		kmem_free(p, sizeof (*p));
6619 	}
6620 	mutex_exit(&h->aio_lock);
6621 }
6622 
6623 /*
6624  * rdc_aio_buf_add
6625  * Add a aio_buf.
6626  */
6627 aio_buf_t *
6628 rdc_aio_buf_add(int index, rdc_buf_t *h)
6629 {
6630 	aio_buf_t *p;
6631 
6632 	p = kmem_zalloc(sizeof (*p), KM_NOSLEEP);
6633 	if (p == NULL) {
6634 		cmn_err(CE_NOTE, "!rdc_aio_buf_add: kmem_alloc failed");
6635 		return (NULL);
6636 	}
6637 
6638 	p->rdc_abufp = NULL;
6639 	p->kindex = index;
6640 
6641 	mutex_enter(&h->aio_lock);
6642 	p->next = h->rdc_anon;
6643 	h->rdc_anon = p;
6644 	mutex_exit(&h->aio_lock);
6645 	return (p);
6646 }
6647 
6648 /*
6649  * kmemalloc a new group structure and setup the common
6650  * fields.
6651  */
6652 static rdc_group_t *
6653 rdc_newgroup()
6654 {
6655 	rdc_group_t *group;
6656 
6657 	group = kmem_zalloc(sizeof (rdc_group_t), KM_SLEEP);
6658 	group->diskq.lastio = kmem_zalloc(sizeof (rdc_aio_t), KM_SLEEP);
6659 	group->count = 1;
6660 	group->seq = RDC_NEWSEQ;
6661 	group->seqack = RDC_NEWSEQ;
6662 	mutex_init(&group->lock, NULL, MUTEX_DRIVER, NULL);
6663 	mutex_init(&group->ra_queue.net_qlock, NULL, MUTEX_DRIVER, NULL);
6664 	mutex_init(&group->diskqmutex, NULL, MUTEX_DRIVER, NULL);
6665 	mutex_init(&group->diskq.disk_qlock, NULL, MUTEX_DRIVER, NULL);
6666 	mutex_init(&group->diskq.head_lock, NULL, MUTEX_DRIVER, NULL);
6667 	mutex_init(&group->addthrnumlk, NULL, MUTEX_DRIVER, NULL);
6668 	cv_init(&group->unregistercv, NULL, CV_DRIVER, NULL);
6669 	cv_init(&group->asyncqcv, NULL, CV_DRIVER, NULL);
6670 	cv_init(&group->diskq.busycv, NULL, CV_DRIVER, NULL);
6671 	cv_init(&group->diskq.qfullcv, NULL, CV_DRIVER, NULL);
6672 	cv_init(&group->ra_queue.qfcv, NULL, CV_DRIVER, NULL);
6673 	group->ra_queue.qfill_sleeping = RDC_QFILL_DEAD;
6674 	group->diskq.busycnt = 0;
6675 	ASSERT(group->synccount == 0);		/* group was kmem_zalloc'ed */
6676 
6677 	/*
6678 	 * add default number of threads to the flusher thread set, plus
6679 	 * one extra thread for the disk queue flusher
6680 	 */
6681 	if (nst_add_thread(_rdc_flset, 3) != 3)
6682 		cmn_err(CE_NOTE, "!rdc_newgroup: nst_add_thread failed");
6683 
6684 	return (group);
6685 }
6686 
6687 void
6688 rdc_delgroup(rdc_group_t *group)
6689 {
6690 
6691 	ASSERT(group->asyncstall == 0);
6692 	ASSERT(group->rdc_thrnum == 0);
6693 	ASSERT(group->count == 0);
6694 	ASSERT(MUTEX_HELD(&rdc_many_lock));
6695 
6696 	mutex_enter(&group->ra_queue.net_qlock);
6697 	rdc_sleepqdiscard(group);
6698 	mutex_exit(&group->ra_queue.net_qlock);
6699 
6700 	/* try to remove flusher threads that this group added to _rdc_flset */
6701 	if (nst_del_thread(_rdc_flset, group->rdc_addthrnum + 3) !=
6702 	    group->rdc_addthrnum + 3)
6703 		cmn_err(CE_NOTE, "!rdc_delgroup: nst_del_thread failed");
6704 
6705 	mutex_destroy(&group->lock);
6706 	mutex_destroy(&group->ra_queue.net_qlock);
6707 	mutex_destroy(&group->diskqmutex);
6708 	mutex_destroy(&group->diskq.disk_qlock);
6709 	mutex_destroy(&group->diskq.head_lock);
6710 	mutex_destroy(&group->addthrnumlk);
6711 	cv_destroy(&group->unregistercv);
6712 	cv_destroy(&group->asyncqcv);
6713 	cv_destroy(&group->diskq.busycv);
6714 	cv_destroy(&group->diskq.qfullcv);
6715 	cv_destroy(&group->ra_queue.qfcv);
6716 	kmem_free(group->diskq.lastio, sizeof (rdc_aio_t));
6717 	kmem_free(group, sizeof (rdc_group_t));
6718 }
6719