xref: /titanic_52/usr/src/uts/common/avs/ns/rdc/rdc_diskq.c (revision 3270659f55e0928d6edec3d26217cc29398a8149)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/types.h>
27 #include <sys/ksynch.h>
28 #include <sys/cmn_err.h>
29 #include <sys/kmem.h>
30 #include <sys/stat.h>
31 #include <sys/errno.h>
32 
33 #include "../solaris/nsc_thread.h"
34 #ifdef DS_DDICT
35 #include "../contract.h"
36 #endif
37 #include <sys/nsctl/nsctl.h>
38 
39 #include <sys/kmem.h>
40 #include <sys/ddi.h>
41 
42 #include <sys/sdt.h>		/* dtrace is S10 or later */
43 
44 #include "rdc_io.h"
45 #include "rdc_bitmap.h"
46 #include "rdc_diskq.h"
47 #include "rdc_clnt.h"
48 
49 #include <sys/unistat/spcs_s.h>
50 #include <sys/unistat/spcs_s_k.h>
51 #include <sys/unistat/spcs_errors.h>
52 
53 extern nsc_io_t *_rdc_io_hc;
54 
55 int rdc_diskq_coalesce = 0;
56 
57 int
58 _rdc_rsrv_diskq(rdc_group_t *group)
59 {
60 	int rc = 0;
61 
62 	mutex_enter(&group->diskqmutex);
63 	if (group->diskqfd == NULL) {
64 		mutex_exit(&group->diskqmutex);
65 		return (EIO);
66 	} else if ((group->diskqrsrv == 0) &&
67 	    (rc = nsc_reserve(group->diskqfd, 0)) != 0) {
68 		cmn_err(CE_WARN,
69 		    "!rdc: nsc_reserve(%s) failed %d\n",
70 		    nsc_pathname(group->diskqfd), rc);
71 	} else {
72 		group->diskqrsrv++;
73 	}
74 
75 	mutex_exit(&group->diskqmutex);
76 	return (rc);
77 }
78 
79 void
80 _rdc_rlse_diskq(rdc_group_t *group)
81 {
82 	mutex_enter(&group->diskqmutex);
83 	if (group->diskqrsrv > 0 && --group->diskqrsrv == 0) {
84 		nsc_release(group->diskqfd);
85 	}
86 	mutex_exit(&group->diskqmutex);
87 }
88 
89 void
90 rdc_wait_qbusy(disk_queue *q)
91 {
92 	ASSERT(MUTEX_HELD(QLOCK(q)));
93 	while (q->busycnt > 0)
94 		cv_wait(&q->busycv, QLOCK(q));
95 }
96 
97 void
98 rdc_set_qbusy(disk_queue *q)
99 {
100 	ASSERT(MUTEX_HELD(QLOCK(q)));
101 	q->busycnt++;
102 }
103 
104 void
105 rdc_clr_qbusy(disk_queue *q)
106 {
107 	ASSERT(MUTEX_HELD(QLOCK(q)));
108 	q->busycnt--;
109 	if (q->busycnt == 0)
110 		cv_broadcast(&q->busycv);
111 }
112 
113 int
114 rdc_lookup_diskq(char *pathname)
115 {
116 	rdc_u_info_t *urdc;
117 #ifdef DEBUG
118 	rdc_k_info_t *krdc;
119 #endif
120 	int index;
121 
122 	for (index = 0; index < rdc_max_sets; index++) {
123 		urdc = &rdc_u_info[index];
124 #ifdef DEBUG
125 		krdc = &rdc_k_info[index];
126 #endif
127 		ASSERT(krdc->index == index);
128 		ASSERT(urdc->index == index);
129 		if (!IS_ENABLED(urdc))
130 			continue;
131 
132 		if (strncmp(pathname, urdc->disk_queue,
133 		    NSC_MAXPATH) == 0)
134 			return (index);
135 	}
136 
137 	return (-1);
138 }
139 
140 void
141 rdc_unintercept_diskq(rdc_group_t *grp)
142 {
143 	if (!RDC_IS_DISKQ(grp))
144 		return;
145 	if (grp->q_tok)
146 		(void) nsc_unregister_path(grp->q_tok, 0);
147 	grp->q_tok = NULL;
148 }
149 
150 void
151 rdc_close_diskq(rdc_group_t *grp)
152 {
153 
154 	if (grp == NULL) {
155 #ifdef DEBUG
156 		cmn_err(CE_WARN, "!rdc_close_diskq: NULL group!");
157 #endif
158 		return;
159 	}
160 
161 	if (grp->diskqfd) {
162 		if (nsc_close(grp->diskqfd) != 0) {
163 #ifdef DEBUG
164 			cmn_err(CE_WARN, "!nsc_close on diskq failed");
165 #else
166 			;
167 			/*EMPTY*/
168 #endif
169 		}
170 		grp->diskqfd = 0;
171 		grp->diskqrsrv = 0;
172 	}
173 	bzero(&grp->diskq.disk_hdr, sizeof (diskq_header));
174 }
175 
176 /*
177  * nsc_open the diskq and attach
178  * the nsc_fd to krdc->diskqfd
179  */
180 int
181 rdc_open_diskq(rdc_k_info_t *krdc)
182 {
183 	rdc_u_info_t *urdc;
184 	rdc_group_t *grp;
185 	int sts;
186 	nsc_size_t size;
187 	char *diskqname;
188 	int mutexheld = 0;
189 
190 	grp = krdc->group;
191 	urdc = &rdc_u_info[krdc->index];
192 
193 	mutex_enter(&grp->diskqmutex);
194 	mutexheld++;
195 	if (&urdc->disk_queue[0] == '\0') {
196 		goto fail;
197 	}
198 
199 	diskqname = &urdc->disk_queue[0];
200 
201 	if (grp->diskqfd == NULL) {
202 		grp->diskqfd = nsc_open(diskqname,
203 		    NSC_RDCHR_ID|NSC_DEVICE|NSC_WRITE, 0, 0, 0);
204 		if (grp->diskqfd == NULL) {
205 			cmn_err(CE_WARN, "!rdc_open_diskq: Unable to open %s",
206 			    diskqname);
207 			goto fail;
208 		}
209 	}
210 	if (!grp->q_tok)
211 		grp->q_tok = nsc_register_path(urdc->disk_queue,
212 		    NSC_DEVICE | NSC_CACHE, _rdc_io_hc);
213 
214 	grp->diskqrsrv = 0; /* init reserve count */
215 
216 	mutex_exit(&grp->diskqmutex);
217 	mutexheld--;
218 	/* just test a reserve release */
219 	sts = _rdc_rsrv_diskq(grp);
220 	if (!RDC_SUCCESS(sts)) {
221 		cmn_err(CE_WARN, "!rdc_open_diskq: Reserve failed for %s",
222 		    diskqname);
223 		goto fail;
224 	}
225 	sts = nsc_partsize(grp->diskqfd, &size);
226 	_rdc_rlse_diskq(grp);
227 
228 	if ((sts == 0) && (size < 1)) {
229 		rdc_unintercept_diskq(grp);
230 		rdc_close_diskq(grp);
231 		goto fail;
232 	}
233 
234 	return (0);
235 
236 fail:
237 	bzero(&urdc->disk_queue, NSC_MAXPATH);
238 	if (mutexheld)
239 		mutex_exit(&grp->diskqmutex);
240 	return (-1);
241 
242 }
243 
244 /*
245  * rdc_count_vecs
246  * simply vec++'s until sb_addr is null
247  * returns number of vectors encountered
248  */
249 int
250 rdc_count_vecs(nsc_vec_t *vec)
251 {
252 	nsc_vec_t	*vecp;
253 	int i = 0;
254 	vecp = vec;
255 	while (vecp->sv_addr) {
256 		vecp++;
257 		i++;
258 	}
259 	return (i+1);
260 }
261 /*
262  * rdc_setid2idx
263  * given setid, return index
264  */
265 int
266 rdc_setid2idx(int setid) {
267 
268 	int index = 0;
269 
270 	for (index = 0; index < rdc_max_sets; index++) {
271 		if (rdc_u_info[index].setid == setid)
272 			break;
273 	}
274 	if (index >= rdc_max_sets)
275 		index = -1;
276 	return (index);
277 }
278 
279 /*
280  * rdc_idx2setid
281  * given an index, return its setid
282  */
283 int
284 rdc_idx2setid(int index)
285 {
286 	return (rdc_u_info[index].setid);
287 }
288 
289 /*
290  * rdc_fill_ioheader
291  * fill in all the stuff you want to save on disk
292  * at the beginnig of each queued write
293  */
294 void
295 rdc_fill_ioheader(rdc_aio_t *aio, io_hdr *hd, int qpos)
296 {
297 	ASSERT(MUTEX_HELD(&rdc_k_info[aio->index].group->diskq.disk_qlock));
298 
299 	hd->dat.magic = RDC_IOHDR_MAGIC;
300 	hd->dat.type = RDC_QUEUEIO;
301 	hd->dat.pos = aio->pos;
302 	hd->dat.hpos = aio->pos;
303 	hd->dat.qpos = qpos;
304 	hd->dat.len = aio->len;
305 	hd->dat.flag = aio->flag;
306 	hd->dat.iostatus = aio->iostatus;
307 	hd->dat.setid = rdc_idx2setid(aio->index);
308 	hd->dat.time = nsc_time();
309 	if (!aio->handle)
310 		hd->dat.flag |= RDC_NULL_BUF; /* no real data to queue */
311 }
312 
313 /*
314  * rdc_dump_iohdrs
315  * give back the iohdr list
316  * and clear out q->lastio
317  */
318 void
319 rdc_dump_iohdrs(disk_queue *q)
320 {
321 	io_hdr *p, *r;
322 
323 	ASSERT(MUTEX_HELD(QLOCK(q)));
324 
325 	p = q->iohdrs;
326 	while (p) {
327 		r = p->dat.next;
328 		kmem_free(p, sizeof (*p));
329 		q->hdrcnt--;
330 		p = r;
331 	}
332 	q->iohdrs = q->hdr_last = NULL;
333 	q->hdrcnt = 0;
334 	if (q->lastio->handle)
335 		(void) nsc_free_buf(q->lastio->handle);
336 	bzero(&(*q->lastio), sizeof (*q->lastio));
337 }
338 
339 /*
340  * rdc_fail_diskq
341  * set flags, throw away q info
342  * clean up what you can
343  * wait for flusher threads to stop (taking into account this may be one)
344  * takes group_lock, so conf, many, and bitmap may not be held
345  */
346 void
347 rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag)
348 {
349 	rdc_k_info_t *p;
350 	rdc_u_info_t *q = &rdc_u_info[krdc->index];
351 	rdc_group_t *group = krdc->group;
352 	disk_queue *dq = &krdc->group->diskq;
353 
354 	if (IS_STATE(q, RDC_DISKQ_FAILED))
355 		return;
356 
357 	if (!(flag & RDC_NOFAIL))
358 		cmn_err(CE_WARN, "!disk queue %s failure", q->disk_queue);
359 
360 	if (flag & RDC_DOLOG) {
361 		rdc_group_enter(krdc);
362 		rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
363 		    "disk queue failed");
364 		rdc_group_exit(krdc);
365 	}
366 	mutex_enter(QHEADLOCK(dq));
367 	mutex_enter(QLOCK(dq));
368 	/*
369 	 * quick stop of the flushers
370 	 * other cleanup is done on the un-failing of the diskq
371 	 */
372 	SET_QHEAD(dq, RDC_DISKQ_DATA_OFF);
373 	SET_QTAIL(dq, RDC_DISKQ_DATA_OFF);
374 	SET_QNXTIO(dq, RDC_DISKQ_DATA_OFF);
375 	SET_LASTQTAIL(dq, 0);
376 
377 	rdc_dump_iohdrs(dq);
378 
379 	mutex_exit(QLOCK(dq));
380 	mutex_exit(QHEADLOCK(dq));
381 
382 	bzero(krdc->bitmap_ref, krdc->bitmap_size * BITS_IN_BYTE *
383 	    BMAP_REF_PREF_SIZE);
384 
385 	if (flag & RDC_DOLOG) /* otherwise, we already have the conf lock */
386 		rdc_group_enter(krdc);
387 
388 	else if (!(flag & RDC_GROUP_LOCKED))
389 		ASSERT(MUTEX_HELD(&rdc_conf_lock));
390 
391 	if (!(flag & RDC_NOFAIL)) {
392 		rdc_set_flags(q, RDC_DISKQ_FAILED);
393 	}
394 	rdc_clr_flags(q, RDC_QUEUING);
395 
396 	for (p = krdc->group_next; p != krdc; p = p->group_next) {
397 		q = &rdc_u_info[p->index];
398 		if (!IS_ENABLED(q))
399 			continue;
400 		if (!(flag & RDC_NOFAIL)) {
401 			rdc_set_flags(q, RDC_DISKQ_FAILED);
402 		}
403 		rdc_clr_flags(q, RDC_QUEUING);
404 		bzero(p->bitmap_ref, p->bitmap_size * BITS_IN_BYTE *
405 		    BMAP_REF_PREF_SIZE);
406 		/* RDC_QUEUING is cleared in group_log() */
407 	}
408 
409 	if (flag & RDC_DOLOG)
410 		rdc_group_exit(krdc);
411 
412 	/* can't wait for myself to go away, I'm a flusher */
413 	if (wait & RDC_WAIT)
414 		while (group->rdc_thrnum)
415 			delay(2);
416 
417 }
418 
419 /*
420  * rdc_stamp_diskq
421  * write out diskq header info
422  * must have disk_qlock held
423  * if rsrvd flag is 0, the nsc_reserve is done
424  */
425 int
426 rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int failflags)
427 {
428 	nsc_vec_t	vec[2];
429 	nsc_buf_t	*head = NULL;
430 	rdc_group_t	*grp;
431 	rdc_u_info_t	*urdc;
432 	disk_queue	*q;
433 	int		rc, flags;
434 
435 	grp = krdc->group;
436 	q = &krdc->group->diskq;
437 
438 	ASSERT(MUTEX_HELD(&q->disk_qlock));
439 
440 	urdc = &rdc_u_info[krdc->index];
441 
442 	if (!rsrvd && _rdc_rsrv_diskq(grp)) {
443 		cmn_err(CE_WARN, "!rdc_stamp_diskq: %s reserve failed",
444 		    urdc->disk_queue);
445 		mutex_exit(QLOCK(q));
446 		rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
447 		mutex_enter(QLOCK(q));
448 		return (-1);
449 	}
450 	flags = NSC_WRITE | NSC_NOCACHE | NSC_NODATA;
451 	rc = nsc_alloc_buf(grp->diskqfd, 0, 1, flags, &head);
452 
453 	if (!RDC_SUCCESS(rc)) {
454 		cmn_err(CE_WARN, "!Alloc buf failed for disk queue %s",
455 		    &urdc->disk_queue[0]);
456 		mutex_exit(QLOCK(q));
457 		rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
458 		mutex_enter(QLOCK(q));
459 		return (-1);
460 	}
461 	vec[0].sv_len = FBA_SIZE(1);
462 	vec[0].sv_addr = (uchar_t *)&q->disk_hdr;
463 	vec[1].sv_len = 0;
464 	vec[1].sv_addr = NULL;
465 
466 	head->sb_vec = &vec[0];
467 
468 #ifdef DEBUG_DISKQ
469 	cmn_err(CE_NOTE, "!rdc_stamp_diskq: hdr: %p magic: %x state: "
470 	    "%x head: %d tail: %d size: %d nitems: %d blocks: %d",
471 	    q, QMAGIC(q), QSTATE(q), QHEAD(q),
472 	    QTAIL(q), QSIZE(q), QNITEMS(q), QBLOCKS(q));
473 #endif
474 
475 	rc = nsc_write(head, 0, 1, 0);
476 
477 	if (!RDC_SUCCESS(rc)) {
478 		if (!rsrvd)
479 			_rdc_rlse_diskq(grp);
480 		cmn_err(CE_CONT, "!disk queue %s failed rc %d",
481 		    &urdc->disk_queue[0], rc);
482 		mutex_exit(QLOCK(q));
483 		rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
484 		mutex_enter(QLOCK(q));
485 		return (-1);
486 	}
487 
488 	(void) nsc_free_buf(head);
489 	if (!rsrvd)
490 		_rdc_rlse_diskq(grp);
491 
492 	return (0);
493 }
494 
495 /*
496  * rdc_init_diskq_header
497  * load initial values into the header
498  */
499 void
500 rdc_init_diskq_header(rdc_group_t *grp, dqheader *header)
501 {
502 	int rc;
503 	int type = 0;
504 	disk_queue *q = &grp->diskq;
505 
506 	ASSERT(MUTEX_HELD(QLOCK(q)));
507 
508 	/* save q type if this is a failure */
509 	if (QSTATE(q) & RDC_QNOBLOCK)
510 		type = RDC_QNOBLOCK;
511 	bzero(header, sizeof (*header));
512 	header->h.magic = RDC_DISKQ_MAGIC;
513 	header->h.vers = RDC_DISKQ_VERS;
514 	header->h.state |= (RDC_SHUTDOWN_BAD|type); /* SHUTDOWN_OK on suspend */
515 	header->h.head_offset = RDC_DISKQ_DATA_OFF;
516 	header->h.tail_offset = RDC_DISKQ_DATA_OFF;
517 	header->h.nitems = 0;
518 	header->h.blocks = 0;
519 	header->h.qwrap = 0;
520 	SET_QNXTIO(q, QHEAD(q));
521 	SET_QCOALBOUNDS(q, RDC_DISKQ_DATA_OFF);
522 
523 	/* do this last, as this might be a failure. get the kernel state ok */
524 	rc = _rdc_rsrv_diskq(grp);
525 	if (!RDC_SUCCESS(rc)) {
526 		cmn_err(CE_WARN, "!init_diskq_hdr: Reserve failed for queue");
527 		return;
528 	}
529 	(void) nsc_partsize(grp->diskqfd, &header->h.disk_size);
530 	_rdc_rlse_diskq(grp);
531 
532 }
533 
534 /*
535  * rdc_unfail_diskq
536  * the diskq failed for some reason, lets try and re-start it
537  * the old stuff has already been thrown away
538  * should just be called from rdc_sync
539  */
540 void
541 rdc_unfail_diskq(rdc_k_info_t *krdc)
542 {
543 	rdc_k_info_t *p;
544 	rdc_u_info_t *q = &rdc_u_info[krdc->index];
545 	rdc_group_t *group = krdc->group;
546 	disk_queue *dq = &group->diskq;
547 
548 	rdc_group_enter(krdc);
549 	rdc_clr_flags(q, RDC_ASYNC);
550 	/* someone else won the race... */
551 	if (!IS_STATE(q, RDC_DISKQ_FAILED)) {
552 		rdc_group_exit(krdc);
553 		return;
554 	}
555 	rdc_clr_flags(q, RDC_DISKQ_FAILED);
556 	for (p = krdc->group_next; p != krdc; p = p->group_next) {
557 		q = &rdc_u_info[p->index];
558 		if (!IS_ENABLED(q))
559 			continue;
560 		rdc_clr_flags(q, RDC_DISKQ_FAILED);
561 		rdc_clr_flags(q, RDC_ASYNC);
562 		if (IS_STATE(q, RDC_QUEUING))
563 			rdc_clr_flags(q, RDC_QUEUING);
564 	}
565 	rdc_group_exit(krdc);
566 
567 	mutex_enter(QLOCK(dq));
568 
569 	rdc_init_diskq_header(group, &group->diskq.disk_hdr);
570 	/* real i/o to the queue */
571 	/* clear RDC_AUXSYNCIP because we cannot halt a sync that's not here */
572 	krdc->aux_state &= ~RDC_AUXSYNCIP;
573 	if (rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED | RDC_DOLOG) < 0) {
574 		mutex_exit(QLOCK(dq));
575 		goto fail;
576 	}
577 
578 	SET_QNXTIO(dq, QHEAD(dq));
579 	SET_QHDRCNT(dq, 0);
580 	SET_QSTATE(dq, RDC_SHUTDOWN_BAD); /* only suspend can write good */
581 	dq->iohdrs = NULL;
582 	dq->hdr_last = NULL;
583 
584 	/* should be none, but.. */
585 	rdc_dump_iohdrs(dq);
586 
587 	mutex_exit(QLOCK(dq));
588 
589 
590 fail:
591 	krdc->aux_state |= RDC_AUXSYNCIP;
592 	return;
593 
594 }
595 
596 int
597 rdc_read_diskq_header(rdc_k_info_t *krdc)
598 {
599 	int rc;
600 	diskq_header *header;
601 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
602 
603 	if (krdc->group->diskqfd == NULL) {
604 		char buf[NSC_MAXPATH];
605 		(void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
606 		    &urdc->secondary.intf[0]);
607 		cmn_err(CE_WARN, "!Disk Queue Header read failed for %s",
608 		    &urdc->group_name[0] == '\0' ? buf:
609 		    &urdc->group_name[0]);
610 		return (-1);
611 	}
612 
613 	header = &krdc->group->diskq.disk_hdr.h;
614 	if (_rdc_rsrv_diskq(krdc->group)) {
615 		return (-1);
616 	}
617 
618 	rc = rdc_ns_io(krdc->group->diskqfd, NSC_RDBUF, 0,
619 	    (uchar_t *)header, sizeof (diskq_header));
620 
621 	_rdc_rlse_diskq(krdc->group);
622 
623 	if (!RDC_SUCCESS(rc)) {
624 		char buf[NSC_MAXPATH];
625 		(void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
626 		    &urdc->secondary.file[0]);
627 		cmn_err(CE_WARN, "!Disk Queue Header read failed(%d) for %s",
628 		    rc, &urdc->group_name[0] == '\0' ? buf :
629 		    &urdc->group_name[0]);
630 		return (-1);
631 	}
632 	return (0);
633 }
634 
635 /*
636  * rdc_stop_diskq_flusher
637  */
638 void
639 rdc_stop_diskq_flusher(rdc_k_info_t *krdc)
640 {
641 	disk_queue q, *qp;
642 	rdc_group_t *group;
643 #ifdef DEBUG
644 	cmn_err(CE_NOTE, "!stopping flusher threads");
645 #endif
646 	group = krdc->group;
647 	qp = &krdc->group->diskq;
648 
649 	/* save the queue info */
650 	q = *qp;
651 
652 	/* lie a little */
653 	SET_QTAIL(qp, RDC_DISKQ_DATA_OFF);
654 	SET_QHEAD(qp, RDC_DISKQ_DATA_OFF);
655 	SET_QSTATE(qp, RDC_QDISABLEPEND);
656 	SET_QSTATE(qp, RDC_STOPPINGFLUSH);
657 
658 	/* drop locks to allow flushers to die */
659 	mutex_exit(QLOCK(qp));
660 	mutex_exit(QHEADLOCK(qp));
661 	rdc_group_exit(krdc);
662 
663 	while (group->rdc_thrnum)
664 		delay(2);
665 
666 	rdc_group_enter(krdc);
667 	mutex_enter(QHEADLOCK(qp));
668 	mutex_enter(QLOCK(qp));
669 
670 	CLR_QSTATE(qp, RDC_STOPPINGFLUSH);
671 	*qp = q;
672 }
673 
674 /*
675  * rdc_enable_diskq
676  * open the diskq
677  * and stamp the header onto it.
678  */
679 int
680 rdc_enable_diskq(rdc_k_info_t *krdc)
681 {
682 	rdc_group_t *group;
683 	disk_queue *q;
684 
685 	group = krdc->group;
686 	q = &group->diskq;
687 
688 	if (rdc_open_diskq(krdc) < 0)
689 		goto fail;
690 
691 	mutex_enter(QLOCK(q));
692 	rdc_init_diskq_header(group, &group->diskq.disk_hdr);
693 
694 	if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
695 		mutex_exit(QLOCK(q));
696 		goto fail;
697 	}
698 
699 	SET_QNXTIO(q, QHEAD(q));
700 
701 	mutex_exit(QLOCK(q));
702 	return (0);
703 
704 fail:
705 	mutex_enter(&group->diskqmutex);
706 	rdc_close_diskq(group);
707 	mutex_exit(&group->diskqmutex);
708 
709 	/* caller has to fail diskq after dropping conf & many locks */
710 	return (RDC_EQNOADD);
711 }
712 
713 /*
714  * rdc_resume_diskq
715  * open the diskq and read the header
716  */
717 int
718 rdc_resume_diskq(rdc_k_info_t *krdc)
719 {
720 	rdc_u_info_t *urdc;
721 	rdc_group_t *group;
722 	disk_queue *q;
723 	int rc = 0;
724 
725 	urdc = &rdc_u_info[krdc->index];
726 	group = krdc->group;
727 	q = &group->diskq;
728 
729 	if (rdc_open_diskq(krdc) < 0) {
730 		rc = RDC_EQNOADD;
731 		goto fail;
732 	}
733 
734 	mutex_enter(QLOCK(q));
735 
736 	rdc_init_diskq_header(group, &group->diskq.disk_hdr);
737 
738 	if (rdc_read_diskq_header(krdc) < 0) {
739 		SET_QSTATE(q, RDC_QBADRESUME);
740 		rc = RDC_EQNOADD;
741 	}
742 
743 	/* check diskq magic number */
744 	if (QMAGIC(q) != RDC_DISKQ_MAGIC) {
745 		cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
746 		    " incorrect magic number in header", urdc->disk_queue);
747 		rdc_init_diskq_header(group, &group->diskq.disk_hdr);
748 		SET_QSTATE(q, RDC_QBADRESUME);
749 		rc = RDC_EQNOADD;
750 	} else switch (QVERS(q)) {
751 		diskq_header1 h1;	/* version 1 header */
752 		diskq_header *hc;	/* current header */
753 
754 #ifdef	NSC_MULTI_TERABYTE
755 		case RDC_DISKQ_VER_ORIG:
756 			/* version 1 diskq header, upgrade to 64bit version */
757 		h1 = *(diskq_header1 *)(&group->diskq.disk_hdr.h);
758 		hc = &group->diskq.disk_hdr.h;
759 
760 		cmn_err(CE_WARN, "!SNDR: old version header for diskq %s,"
761 		    " upgrading to current version", urdc->disk_queue);
762 		hc->vers = RDC_DISKQ_VERS;
763 		hc->state = h1.state;
764 		hc->head_offset = h1.head_offset;
765 		hc->tail_offset = h1.tail_offset;
766 		hc->disk_size = h1.disk_size;
767 		hc->nitems = h1.nitems;
768 		hc->blocks = h1.blocks;
769 		hc->qwrap = h1.qwrap;
770 		hc->auxqwrap = h1.auxqwrap;
771 		hc->seq_last = h1.seq_last;
772 		hc->ack_last = h1.ack_last;
773 
774 		if (hc->nitems > 0) {
775 			cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
776 			    " old version Q contains data", urdc->disk_queue);
777 			rdc_init_diskq_header(group, &group->diskq.disk_hdr);
778 			SET_QSTATE(q, RDC_QBADRESUME);
779 			rc = RDC_EQNOADD;
780 		}
781 		break;
782 #else
783 		case RDC_DISKQ_VER_64BIT:
784 			cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
785 			    " diskq header newer than current version",
786 			    urdc->disk_queue);
787 			rdc_init_diskq_header(group, &group->diskq.disk_hdr);
788 			SET_QSTATE(q, RDC_QBADRESUME);
789 			rc = RDC_EQNOADD;
790 		break;
791 #endif
792 		case RDC_DISKQ_VERS:
793 			/* okay, current version diskq */
794 		break;
795 		default:
796 			cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
797 			    " unknown diskq header version", urdc->disk_queue);
798 			rdc_init_diskq_header(group, &group->diskq.disk_hdr);
799 			SET_QSTATE(q, RDC_QBADRESUME);
800 			rc = RDC_EQNOADD;
801 		break;
802 	}
803 	if (IS_QSTATE(q, RDC_SHUTDOWN_BAD)) {
804 		cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
805 		    " unsafe shutdown", urdc->disk_queue);
806 		rdc_init_diskq_header(group, &group->diskq.disk_hdr);
807 		SET_QSTATE(q, RDC_QBADRESUME);
808 		rc = RDC_EQNOADD;
809 	}
810 
811 	CLR_QSTATE(q, RDC_SHUTDOWN_OK);
812 	SET_QSTATE(q, RDC_SHUTDOWN_BAD);
813 
814 	/* bad, until proven not bad */
815 	if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
816 		rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_NOLOG);
817 		rc = RDC_EQNOADD;
818 	}
819 
820 	SET_QNXTIO(q, QHEAD(q));
821 	group->diskq.nitems_hwm = QNITEMS(q);
822 	group->diskq.blocks_hwm = QBLOCKS(q);
823 
824 	mutex_exit(QLOCK(q));
825 
826 #ifdef DEBUG
827 	cmn_err(CE_NOTE, "!rdc_resume_diskq: resuming diskq %s \n",
828 	    urdc->disk_queue);
829 	cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
830 #endif
831 	if (rc == 0)
832 		return (0);
833 
834 fail:
835 
836 	/* caller has to set the diskq failed after dropping it's locks */
837 	return (rc);
838 
839 }
840 
841 int
842 rdc_suspend_diskq(rdc_k_info_t *krdc)
843 {
844 	int rc;
845 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
846 	disk_queue *q;
847 
848 	q = &krdc->group->diskq;
849 
850 	/* grab both diskq locks as we are going to kill the flusher */
851 	mutex_enter(QHEADLOCK(q));
852 	mutex_enter(QLOCK(q));
853 
854 	if ((krdc->group->rdc_thrnum) && (!IS_QSTATE(q, RDC_STOPPINGFLUSH))) {
855 		SET_QSTATE(q, RDC_STOPPINGFLUSH);
856 		rdc_stop_diskq_flusher(krdc);
857 		CLR_QSTATE(q, RDC_STOPPINGFLUSH);
858 	}
859 
860 	krdc->group->diskq.disk_hdr.h.state &= ~RDC_SHUTDOWN_BAD;
861 	krdc->group->diskq.disk_hdr.h.state |= RDC_SHUTDOWN_OK;
862 	krdc->group->diskq.disk_hdr.h.state &= ~RDC_QBADRESUME;
863 
864 	/* let's make sure that the flusher has stopped.. */
865 	if (krdc->group->rdc_thrnum) {
866 		mutex_exit(QLOCK(q));
867 		mutex_exit(QHEADLOCK(q));
868 		rdc_group_exit(krdc);
869 
870 		while (krdc->group->rdc_thrnum)
871 			delay(5);
872 
873 		rdc_group_enter(krdc);
874 		mutex_enter(QLOCK(q));
875 		mutex_enter(QHEADLOCK(q));
876 	}
877 	/* write refcount to the bitmap */
878 	if ((rc = rdc_write_refcount(krdc)) < 0) {
879 		rdc_group_exit(krdc);
880 		goto fail;
881 	}
882 
883 	if (!QEMPTY(q)) {
884 		rdc_set_flags(urdc, RDC_QUEUING);
885 	} else {
886 		rdc_clr_flags(urdc, RDC_QUEUING);
887 	}
888 
889 	/* fill in diskq header info */
890 	krdc->group->diskq.disk_hdr.h.state &= ~RDC_QDISABLEPEND;
891 
892 #ifdef DEBUG
893 	cmn_err(CE_NOTE, "!suspending disk queue\n" QDISPLAY(q));
894 #endif
895 
896 	/* to avoid a possible deadlock, release in order, and reacquire */
897 	mutex_exit(QLOCK(q));
898 	mutex_exit(QHEADLOCK(q));
899 
900 	if (krdc->group->count > 1) {
901 		rdc_group_exit(krdc);
902 		goto fail; /* just stamp on the last suspend */
903 	}
904 	rdc_group_exit(krdc); /* in case this stamp fails */
905 	mutex_enter(QLOCK(q));
906 
907 	rc = rdc_stamp_diskq(krdc, 0, RDC_NOLOG);
908 
909 	mutex_exit(QLOCK(q));
910 
911 fail:
912 	rdc_group_enter(krdc);
913 
914 	/* diskq already failed if stamp failed */
915 
916 	return (rc);
917 }
918 
919 /*
920  * copy orig aio to copy, including the nsc_buf_t
921  */
922 int
923 rdc_dup_aio(rdc_aio_t *orig, rdc_aio_t *copy)
924 {
925 	int rc;
926 	bcopy(orig, copy, sizeof (*orig));
927 	copy->handle = NULL;
928 
929 	if (orig->handle == NULL) /* no buf to alloc/copy */
930 		return (0);
931 
932 	rc = nsc_alloc_abuf(orig->pos, orig->len, 0, &copy->handle);
933 	if (!RDC_SUCCESS(rc)) {
934 #ifdef DEBUG
935 		cmn_err(CE_WARN, "!rdc_dup_aio: alloc_buf failed (%d)", rc);
936 #endif
937 		return (rc);
938 	}
939 	rc = nsc_copy(orig->handle, copy->handle, orig->pos,
940 	    orig->pos, orig->len);
941 	if (!RDC_SUCCESS(rc)) {
942 		(void) nsc_free_buf(copy->handle);
943 #ifdef DEBUG
944 		cmn_err(CE_WARN, "!rdc_dup_aio: copy buf failed (%d)", rc);
945 #endif
946 		return (rc);
947 	}
948 	return (0);
949 }
950 
951 /*
952  * rdc_qfill_shldwakeup()
953  * 0 if the memory queue has filled, and the low water
954  * mark has not been reached. 0 if diskq is empty.
955  * 1 if less than low water mark
956  * net_queue mutex is already held
957  */
958 int
959 rdc_qfill_shldwakeup(rdc_k_info_t *krdc)
960 {
961 	rdc_group_t *group = krdc->group;
962 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
963 	net_queue *nq = &group->ra_queue;
964 	disk_queue *dq = &group->diskq;
965 
966 	ASSERT(MUTEX_HELD(&nq->net_qlock));
967 
968 	if (!RDC_IS_DISKQ(krdc->group))
969 		return (0);
970 
971 	if (nq->qfill_sleeping != RDC_QFILL_ASLEEP)
972 		return (0);
973 
974 	if (nq->qfflags & RDC_QFILLSTOP)
975 		return (1);
976 
977 	if (nq->qfflags & RDC_QFILLSLEEP)
978 		return (0);
979 
980 	if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING))
981 		return (0);
982 
983 	mutex_enter(QLOCK(dq));
984 	if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
985 		mutex_exit(QLOCK(dq));
986 		return (0);
987 	}
988 	mutex_exit(QLOCK(dq));
989 
990 	if (nq->qfill_sleeping == RDC_QFILL_ASLEEP) {
991 		if (nq->hwmhit) {
992 			if (nq->blocks <= RDC_LOW_QBLOCKS) {
993 				nq->hwmhit = 0;
994 			} else {
995 				return (0);
996 			}
997 		}
998 #ifdef DEBUG_DISKQ_NOISY
999 		cmn_err(CE_NOTE, "!Waking up diskq->memq flusher, flags 0x%x"
1000 		    " idx: %d", rdc_get_vflags(urdc), urdc->index);
1001 #endif
1002 		return (1);
1003 	}
1004 	return (0);
1005 
1006 }
1007 
1008 /*
1009  * rdc_diskq_enqueue
1010  * enqueue one i/o to the diskq
1011  * after appending some metadata to the front
1012  */
1013 int
1014 rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1015 {
1016 	nsc_vec_t	*vec = NULL;
1017 	nsc_buf_t	*bp = NULL;
1018 	nsc_buf_t	*qbuf = NULL;
1019 	io_hdr		*iohdr = NULL;
1020 	disk_queue	*q;
1021 	rdc_group_t	*group;
1022 	int		numvecs;
1023 	int		i, j, rc = 0;
1024 	int		retries = 0;
1025 	rdc_u_info_t	*urdc;
1026 	nsc_size_t	iofbas; /* len of io + io header len */
1027 	int		qtail;
1028 	int		delay_time = 2;
1029 	int 		print_msg = 1;
1030 
1031 #ifdef DEBUG_WRITER_UBERNOISE
1032 	int		qhead;
1033 #endif
1034 	urdc = &rdc_u_info[krdc->index];
1035 	group = krdc->group;
1036 	q = &group->diskq;
1037 
1038 	mutex_enter(QLOCK(q));
1039 
1040 	/*
1041 	 * there is a thread that is blocking because the queue is full,
1042 	 * don't try to set up this write until all is clear
1043 	 * check before and after for logging or failed queue just
1044 	 * in case a thread was in flight while the queue was full,
1045 	 * and in the proccess of failing
1046 	 */
1047 	while (IS_QSTATE(q, RDC_QFULL)) {
1048 		if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1049 		    (IS_STATE(urdc, RDC_LOGGING) &&
1050 		    !IS_STATE(urdc, RDC_QUEUING))) {
1051 			mutex_exit(QLOCK(q));
1052 			if (aio->handle)
1053 				(void) nsc_free_buf(aio->handle);
1054 			return (-1);
1055 		}
1056 		cv_wait(&q->qfullcv, QLOCK(q));
1057 
1058 		if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1059 		    (IS_STATE(urdc, RDC_LOGGING) &&
1060 		    !IS_STATE(urdc, RDC_QUEUING))) {
1061 			mutex_exit(QLOCK(q));
1062 			if (aio->handle)
1063 				(void) nsc_free_buf(aio->handle);
1064 			return (-1);
1065 		}
1066 
1067 	}
1068 
1069 	SET_QSTATE(q, QTAILBUSY);
1070 
1071 	if (aio->handle == NULL) {
1072 		/* we're only going to write the header to the queue */
1073 		numvecs = 2; /* kmem_alloc io header + null terminate */
1074 		iofbas = FBA_LEN(sizeof (io_hdr));
1075 
1076 	} else {
1077 		/* find out how many vecs */
1078 		numvecs = rdc_count_vecs(aio->handle->sb_vec) + 1;
1079 		iofbas = aio->len + FBA_LEN(sizeof (io_hdr));
1080 	}
1081 
1082 	/*
1083 	 * this, in conjunction with QTAILBUSY, will prevent
1084 	 * premature dequeuing
1085 	 */
1086 
1087 	SET_LASTQTAIL(q, QTAIL(q));
1088 
1089 	iohdr = (io_hdr *) kmem_zalloc(sizeof (io_hdr), KM_NOSLEEP);
1090 	vec = (nsc_vec_t *) kmem_zalloc(sizeof (nsc_vec_t) * numvecs,
1091 	    KM_NOSLEEP);
1092 
1093 	if (!vec || !iohdr) {
1094 		if (!vec) {
1095 			cmn_err(CE_WARN, "!vec kmem alloc failed");
1096 		} else {
1097 			cmn_err(CE_WARN, "!iohdr kmem alloc failed");
1098 		}
1099 		if (vec)
1100 			kmem_free(vec, sizeof (*vec));
1101 		if (iohdr)
1102 			kmem_free(iohdr, sizeof (*iohdr));
1103 		CLR_QSTATE(q, QTAILBUSY);
1104 		SET_LASTQTAIL(q, 0);
1105 		mutex_exit(QLOCK(q));
1106 		if (aio->handle)
1107 			(void) nsc_free_buf(aio->handle);
1108 		return (ENOMEM);
1109 	}
1110 
1111 	vec[numvecs - 1].sv_len = 0;
1112 	vec[numvecs - 1].sv_addr = 0;
1113 
1114 	/* now add the write itself */
1115 	bp = aio->handle;
1116 
1117 	for (i = 1, j = 0; bp && bp->sb_vec[j].sv_addr &&
1118 	    i < numvecs; i++, j++) {
1119 		vec[i].sv_len = bp->sb_vec[j].sv_len;
1120 		vec[i].sv_addr = bp->sb_vec[j].sv_addr;
1121 	}
1122 
1123 retry:
1124 
1125 	/* check for queue wrap, then check for overflow */
1126 	if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1127 	    (IS_STATE(urdc, RDC_LOGGING) && !IS_STATE(urdc, RDC_QUEUING))) {
1128 		kmem_free(iohdr, sizeof (*iohdr));
1129 		kmem_free(vec, sizeof (*vec) * numvecs);
1130 		CLR_QSTATE(q, QTAILBUSY);
1131 		SET_LASTQTAIL(q, 0);
1132 		if (IS_QSTATE(q, RDC_QFULL)) { /* wakeup blocked threads */
1133 			CLR_QSTATE(q, RDC_QFULL);
1134 			cv_broadcast(&q->qfullcv);
1135 		}
1136 		mutex_exit(QLOCK(q));
1137 		if (aio->handle)
1138 			(void) nsc_free_buf(aio->handle);
1139 
1140 		return (-1);
1141 	}
1142 
1143 	if (QTAILSHLDWRAP(q, iofbas)) {
1144 		/*
1145 		 * just go back to the beginning of the disk
1146 		 * it's not worth the trouble breaking up the write
1147 		 */
1148 #ifdef DEBUG_DISKQWRAP
1149 		cmn_err(CE_NOTE, "!wrapping Q tail: " QDISPLAY(q));
1150 #endif
1151 		/*LINTED*/
1152 		WRAPQTAIL(q);
1153 	}
1154 
1155 	/*
1156 	 * prepend the write's metadata
1157 	 */
1158 	rdc_fill_ioheader(aio, iohdr, QTAIL(q));
1159 
1160 	vec[0].sv_len = FBA_SIZE(1);
1161 	vec[0].sv_addr = (uchar_t *)iohdr;
1162 
1163 	/* check for tail < head */
1164 
1165 	if (!(FITSONQ(q, iofbas))) {
1166 		/*
1167 		 * don't allow any more writes to start
1168 		 */
1169 		SET_QSTATE(q, RDC_QFULL);
1170 		mutex_exit(QLOCK(q));
1171 
1172 		if ((!group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1173 			(void) rdc_writer(krdc->index);
1174 
1175 		delay(delay_time);
1176 		q->throttle_delay += delay_time;
1177 		retries++;
1178 		delay_time *= 2; /* fairly aggressive */
1179 		if ((retries >= 8) || (delay_time >= 256)) {
1180 			delay_time = 2;
1181 			if (print_msg) {
1182 				cmn_err(CE_WARN, "!enqueue: disk queue %s full",
1183 				    &urdc->disk_queue[0]);
1184 				print_msg = 0;
1185 #ifdef DEBUG
1186 				cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
1187 #else
1188 				cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1189 #endif
1190 			}
1191 			/*
1192 			 * if this is a no-block queue, or this is a blocking
1193 			 * queue that is not flushing. reset and log
1194 			 */
1195 			if ((QSTATE(q) & RDC_QNOBLOCK) ||
1196 			    (IS_STATE(urdc, RDC_QUEUING))) {
1197 
1198 				if (IS_STATE(urdc, RDC_QUEUING)) {
1199 		cmn_err(CE_WARN, "!SNDR: disk queue %s full and not flushing. "
1200 		    "giving up", &urdc->disk_queue[0]);
1201 		cmn_err(CE_WARN, "!SNDR: %s:%s entering logging mode",
1202 		    urdc->secondary.intf, urdc->secondary.file);
1203 				}
1204 
1205 				rdc_fail_diskq(krdc, RDC_WAIT,
1206 				    RDC_DOLOG | RDC_NOFAIL);
1207 				kmem_free(iohdr, sizeof (*iohdr));
1208 				kmem_free(vec, sizeof (*vec) * numvecs);
1209 				mutex_enter(QLOCK(q));
1210 				CLR_QSTATE(q, QTAILBUSY | RDC_QFULL);
1211 				cv_broadcast(&q->qfullcv);
1212 				mutex_exit(QLOCK(q));
1213 				SET_LASTQTAIL(q, 0);
1214 				if (aio->handle)
1215 					(void) nsc_free_buf(aio->handle);
1216 				return (ENOMEM);
1217 			}
1218 		}
1219 
1220 		mutex_enter(QLOCK(q));
1221 		goto retry;
1222 
1223 	}
1224 
1225 	qtail = QTAIL(q);
1226 #ifdef DEBUG_WRITER_UBERNOISE
1227 	qhead = QHEAD(q);
1228 #endif
1229 
1230 	/* update tail pointer, nitems on queue and blocks on queue */
1231 	INC_QTAIL(q, iofbas); /* increment tail over i/o size + ioheader size */
1232 	INC_QNITEMS(q, 1);
1233 	/* increment counter for i/o blocks only */
1234 	INC_QBLOCKS(q, (iofbas - FBA_LEN(sizeof (io_hdr))));
1235 
1236 	if (QNITEMS(q) > q->nitems_hwm)
1237 		q->nitems_hwm = QNITEMS(q);
1238 	if (QBLOCKS(q) > q->blocks_hwm)
1239 		q->blocks_hwm = QBLOCKS(q);
1240 
1241 	if (IS_QSTATE(q, RDC_QFULL)) {
1242 		CLR_QSTATE(q, RDC_QFULL);
1243 		cv_broadcast(&q->qfullcv);
1244 	}
1245 
1246 	mutex_exit(QLOCK(q));
1247 
1248 	/*
1249 	 * if (krdc->io_kstats) {
1250 	 *	mutex_enter(krdc->io_kstats->ks_lock);
1251 	 *	kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1252 	 *	mutex_exit(krdc->io_kstats->ks_lock);
1253 	 * }
1254 	 */
1255 
1256 	DTRACE_PROBE(rdc_diskq_rsrv);
1257 
1258 	if (_rdc_rsrv_diskq(group)) {
1259 		cmn_err(CE_WARN, "!rdc_enqueue: %s reserve failed",
1260 		    &urdc->disk_queue[0]);
1261 		rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1262 		kmem_free(iohdr, sizeof (*iohdr));
1263 		kmem_free(vec, sizeof (*vec) * numvecs);
1264 		mutex_enter(QLOCK(q));
1265 		CLR_QSTATE(q, QTAILBUSY);
1266 		SET_LASTQTAIL(q, 0);
1267 		mutex_exit(QLOCK(q));
1268 		if (aio->handle)
1269 			(void) nsc_free_buf(aio->handle);
1270 		return (-1);
1271 	}
1272 
1273 /* XXX for now do this, but later pre-alloc handle in enable/resume */
1274 
1275 	DTRACE_PROBE(rdc_diskq_alloc_start);
1276 	rc = nsc_alloc_buf(group->diskqfd, qtail, iofbas,
1277 	    NSC_NOCACHE | NSC_WRITE | NSC_NODATA, &qbuf);
1278 
1279 	DTRACE_PROBE(rdc_diskq_alloc_end);
1280 
1281 	if (!RDC_SUCCESS(rc)) {
1282 		cmn_err(CE_WARN, "!disk queue %s alloc failed(%d) %" NSC_SZFMT,
1283 		    &urdc->disk_queue[0], rc, iofbas);
1284 		rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1285 		rc = ENOMEM;
1286 		goto fail;
1287 	}
1288 	/* move vec and write to queue */
1289 	qbuf->sb_vec = &vec[0];
1290 
1291 #ifdef DEBUG_WRITER_UBERNOISE
1292 
1293 	cmn_err(CE_NOTE, "!about to write to queue, qbuf: %p, qhead: %d, "
1294 	    "qtail: %d, len: %d contents: %c%c%c%c%c",
1295 	    (void *) qbuf, qhead, qtail, iofbas,
1296 	    qbuf->sb_vec[1].sv_addr[0],
1297 	    qbuf->sb_vec[1].sv_addr[1],
1298 	    qbuf->sb_vec[1].sv_addr[2],
1299 	    qbuf->sb_vec[1].sv_addr[3],
1300 	    qbuf->sb_vec[1].sv_addr[4]);
1301 	cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1302 
1303 #endif
1304 
1305 	DTRACE_PROBE2(rdc_diskq_nswrite_start, int, qtail, nsc_size_t, iofbas);
1306 	rc = nsc_write(qbuf, qtail, iofbas, 0);
1307 	DTRACE_PROBE2(rdc_diskq_nswrite_end, int, qtail, nsc_size_t, iofbas);
1308 
1309 	if (!RDC_SUCCESS(rc)) {
1310 		cmn_err(CE_WARN, "!disk queue %s write failed %d",
1311 		    &urdc->disk_queue[0], rc);
1312 		rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1313 		goto fail;
1314 
1315 	}
1316 
1317 	mutex_enter(QLOCK(q));
1318 
1319 	SET_LASTQTAIL(q, 0);
1320 	CLR_QSTATE(q, QTAILBUSY);
1321 
1322 	mutex_exit(QLOCK(q));
1323 
1324 fail:
1325 
1326 	/*
1327 	 * return what should be returned
1328 	 * the aio is returned in _rdc_write after status is gathered.
1329 	 */
1330 
1331 	if (qbuf)
1332 		qbuf->sb_vec = 0;
1333 	(void) nsc_free_buf(qbuf);
1334 
1335 	if (aio->handle)
1336 		(void) nsc_free_buf(aio->handle);
1337 
1338 	_rdc_rlse_diskq(group);
1339 	DTRACE_PROBE(rdc_diskq_rlse);
1340 
1341 	/* free the iohdr and the vecs */
1342 
1343 	if (iohdr)
1344 		kmem_free(iohdr, sizeof (*iohdr));
1345 	if (vec)
1346 		kmem_free(vec, sizeof (*vec) * numvecs);
1347 
1348 	/* if no flusher running, start one */
1349 	if ((!krdc->group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1350 		(void) rdc_writer(krdc->index);
1351 
1352 	return (rc);
1353 }
1354 
1355 /*
1356  * place this on the pending list of io_hdr's out for flushing
1357  */
1358 void
1359 rdc_add_iohdr(io_hdr *header, rdc_group_t *group)
1360 {
1361 	disk_queue *q = NULL;
1362 #ifdef DEBUG
1363 	io_hdr *p;
1364 #endif
1365 
1366 	q = &group->diskq;
1367 
1368 	/* paranoia */
1369 	header->dat.next = NULL;
1370 
1371 	mutex_enter(QLOCK(q));
1372 #ifdef DEBUG /* AAAH! double flush!? */
1373 	p = q->iohdrs;
1374 	while (p) {
1375 		if (p->dat.qpos == header->dat.qpos) {
1376 			cmn_err(CE_WARN, "!ADDING DUPLICATE HEADER %" NSC_SZFMT,
1377 			    p->dat.qpos);
1378 			kmem_free(header, sizeof (*header));
1379 			mutex_exit(QLOCK(q));
1380 			return;
1381 		}
1382 		p = p->dat.next;
1383 	}
1384 #endif
1385 	if (q->iohdrs == NULL) {
1386 		q->iohdrs = q->hdr_last = header;
1387 		q->hdrcnt = 1;
1388 		mutex_exit(QLOCK(q));
1389 		return;
1390 	}
1391 
1392 	q->hdr_last->dat.next = header;
1393 	q->hdr_last = header;
1394 	q->hdrcnt++;
1395 	mutex_exit(QLOCK(q));
1396 	return;
1397 
1398 }
1399 
1400 /*
1401  * mark an io header as flushed. If it is the qhead,
1402  * then update the qpointers
1403  * free the io_hdrs
1404  * called after the bitmap is cleared by flusher
1405  */
1406 void
1407 rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_size_t qpos)
1408 {
1409 	rdc_group_t *group = krdc->group;
1410 	disk_queue *q = NULL;
1411 	io_hdr	*hp = NULL;
1412 	io_hdr	*p = NULL;
1413 	int found = 0;
1414 	int cnt = 0;
1415 
1416 #ifndef NSC_MULTI_TERABYTE
1417 	ASSERT(qpos >= 0);	/* assertion to validate change for 64bit */
1418 	if (qpos < 0) /* not a diskq offset */
1419 		return;
1420 #endif
1421 
1422 	q = &group->diskq;
1423 	mutex_enter(QLOCK(q));
1424 
1425 	hp = p = q->iohdrs;
1426 
1427 	/* find outstanding io_hdr */
1428 	while (hp) {
1429 		if (hp->dat.qpos == qpos) {
1430 			found++;
1431 			break;
1432 		}
1433 		cnt++;
1434 		p = hp;
1435 		hp = hp->dat.next;
1436 	}
1437 
1438 	if (!found) {
1439 		if (RDC_BETWEEN(QHEAD(q), QNXTIO(q), qpos)) {
1440 #ifdef DEBUG
1441 			cmn_err(CE_WARN, "!iohdr already cleared? "
1442 			"qpos %" NSC_SZFMT " cnt %d ", qpos, cnt);
1443 			cmn_err(CE_WARN, "!Qinfo: " QDISPLAY(q));
1444 #endif
1445 			mutex_exit(QLOCK(q));
1446 			return;
1447 		}
1448 		mutex_exit(QLOCK(q));
1449 		return;
1450 	}
1451 
1452 	/* mark it as flushed */
1453 	hp->dat.iostatus = RDC_IOHDR_DONE;
1454 
1455 	/*
1456 	 * if it is the head pointer, travel the list updating the queue
1457 	 * pointers until the next unflushed is reached, freeing on the way.
1458 	 */
1459 	while (hp && (hp->dat.qpos == QHEAD(q)) &&
1460 	    (hp->dat.iostatus == RDC_IOHDR_DONE)) {
1461 #ifdef DEBUG_FLUSHER_UBERNOISE
1462 		cmn_err(CE_NOTE, "!clr_iohdr info: magic %x type %d pos %d"
1463 		    " qpos %d hpos %d len %d flag 0x%x iostatus %x setid %d",
1464 		    hp->dat.magic, hp->dat.type, hp->dat.pos, hp->dat.qpos,
1465 		    hp->dat.hpos, hp->dat.len, hp->dat.flag,
1466 		    hp->dat.iostatus, hp->dat.setid);
1467 #endif
1468 		if (hp->dat.flag & RDC_NULL_BUF) {
1469 			INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)));
1470 		} else {
1471 			INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)) + hp->dat.len);
1472 			DEC_QBLOCKS(q, hp->dat.len);
1473 		}
1474 
1475 		DEC_QNITEMS(q, 1);
1476 
1477 		if (QHEADSHLDWRAP(q)) { /* simple enough */
1478 #ifdef DEBUG_DISKQWRAP
1479 			cmn_err(CE_NOTE, "!wrapping Q head: " QDISPLAY(q));
1480 #endif
1481 			/*LINTED*/
1482 			WRAPQHEAD(q);
1483 		}
1484 
1485 		/* get rid of the iohdr */
1486 		if (hp == q->iohdrs) {
1487 			q->iohdrs = hp->dat.next;
1488 			kmem_free(hp, sizeof (*hp));
1489 			hp = q->iohdrs;
1490 		} else {
1491 			if (hp == q->hdr_last)
1492 				q->hdr_last = p;
1493 			p->dat.next = hp->dat.next;
1494 			kmem_free(hp, sizeof (*hp));
1495 			hp = p->dat.next;
1496 		}
1497 		q->hdrcnt--;
1498 	}
1499 
1500 	if (QEMPTY(q) && !IS_QSTATE(q, RDC_QFULL) &&
1501 	    !(IS_QSTATE(q, RDC_QDISABLEPEND))) {
1502 #ifdef DEBUG_FLUSHER_UBERNOISE
1503 		rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1504 		cmn_err(CE_NOTE, "!clr_iohdr: diskq %s empty, "
1505 		    "resetting defaults", urdc->disk_queue);
1506 #endif
1507 
1508 		rdc_init_diskq_header(group, &q->disk_hdr);
1509 		SET_QNXTIO(q, QHEAD(q));
1510 	}
1511 
1512 	/* wakeup any blocked enqueue threads */
1513 	cv_broadcast(&q->qfullcv);
1514 	mutex_exit(QLOCK(q));
1515 }
1516 
1517 /*
1518  * put in whatever useful checks we can on the io header
1519  */
1520 int
1521 rdc_iohdr_ok(io_hdr *hdr)
1522 {
1523 	if (hdr->dat.magic != RDC_IOHDR_MAGIC)
1524 		goto bad;
1525 	return (1);
1526 bad:
1527 
1528 #ifdef DEBUG
1529 	cmn_err(CE_WARN, "!Bad io header magic %x type %d pos %" NSC_SZFMT
1530 	    " hpos %" NSC_SZFMT " qpos %" NSC_SZFMT " len %" NSC_SZFMT
1531 	    " flag %d iostatus %d setid %d", hdr->dat.magic,
1532 	    hdr->dat.type, hdr->dat.pos, hdr->dat.hpos, hdr->dat.qpos,
1533 	    hdr->dat.len, hdr->dat.flag, hdr->dat.iostatus, hdr->dat.setid);
1534 #else
1535 	cmn_err(CE_WARN, "!Bad io header retrieved");
1536 #endif
1537 	return (0);
1538 }
1539 
1540 /*
1541  * rdc_netqueue_insert()
1542  * add an item to a netqueue. No locks necessary as it should only
1543  * be used in a single threaded manor. If that changes, then
1544  * a lock or assertion should be done here
1545  */
1546 void
1547 rdc_netqueue_insert(rdc_aio_t *aio, net_queue *q)
1548 {
1549 	rdc_k_info_t *krdc = &rdc_k_info[aio->index];
1550 
1551 	/* paranoid check for bit set */
1552 	RDC_CHECK_BIT(krdc, aio->pos, aio->len);
1553 
1554 	if (q->net_qhead == NULL) {
1555 		q->net_qhead = q->net_qtail = aio;
1556 
1557 	} else {
1558 		q->net_qtail->next = aio;
1559 		q->net_qtail = aio;
1560 	}
1561 	q->blocks += aio->len;
1562 	q->nitems++;
1563 
1564 	if (q->nitems > q->nitems_hwm) {
1565 		q->nitems_hwm = q->nitems;
1566 	}
1567 	if (q->blocks > q->blocks_hwm) {
1568 		q->nitems_hwm = q->blocks;
1569 	}
1570 }
1571 
1572 /*
1573  * rdc_fill_aio(aio, hdr)
1574  * take the pertinent info from an io_hdr and stick it in
1575  * an aio, including seq number, abuf.
1576  */
1577 void
1578 rdc_fill_aio(rdc_group_t *grp, rdc_aio_t *aio, io_hdr *hdr, nsc_buf_t *abuf)
1579 {
1580 	if (hdr->dat.flag & RDC_NULL_BUF) {
1581 		aio->handle = NULL;
1582 	} else {
1583 		aio->handle = abuf;
1584 	}
1585 	aio->qhandle = abuf;
1586 	aio->pos = hdr->dat.pos;
1587 	aio->qpos = hdr->dat.qpos;
1588 	aio->len = hdr->dat.len;
1589 	aio->flag = hdr->dat.flag;
1590 	if ((aio->index = rdc_setid2idx(hdr->dat.setid)) < 0)
1591 		return;
1592 	mutex_enter(&grp->diskq.disk_qlock);
1593 	if (grp->ra_queue.qfflags & RDC_QFILLSLEEP) {
1594 		mutex_exit(&grp->diskq.disk_qlock);
1595 		aio->seq = RDC_NOSEQ;
1596 		return;
1597 	}
1598 	if (abuf && aio->qhandle) {
1599 		abuf->sb_user++;
1600 	}
1601 	aio->seq = grp->seq++;
1602 	if (grp->seq < aio->seq)
1603 		grp->seq = RDC_NEWSEQ + 1;
1604 	mutex_exit(&grp->diskq.disk_qlock);
1605 	hdr->dat.iostatus = aio->seq;
1606 
1607 }
1608 
1609 #ifdef DEBUG
1610 int maxaios_perbuf = 0;
1611 int midaios_perbuf = 0;
1612 int aveaios_perbuf = 0;
1613 int totaios_perbuf = 0;
1614 int buf2qcalls = 0;
1615 
1616 void
1617 calc_perbuf(int items)
1618 {
1619 	if (totaios_perbuf < 0) {
1620 		maxaios_perbuf = 0;
1621 		midaios_perbuf = 0;
1622 		aveaios_perbuf = 0;
1623 		totaios_perbuf = 0;
1624 		buf2qcalls = 0;
1625 	}
1626 
1627 	if (items > maxaios_perbuf)
1628 		maxaios_perbuf = items;
1629 	midaios_perbuf = maxaios_perbuf / 2;
1630 	totaios_perbuf += items;
1631 	aveaios_perbuf = totaios_perbuf / buf2qcalls;
1632 }
1633 #endif
1634 
1635 /*
1636  * rdc_discard_tmpq()
1637  * free up the passed temporary queue
1638  * NOTE: no cv's or mutexes have been initialized
1639  */
1640 void
1641 rdc_discard_tmpq(net_queue *q)
1642 {
1643 	rdc_aio_t *aio;
1644 
1645 	if (q == NULL)
1646 		return;
1647 
1648 	while (q->net_qhead) {
1649 		aio = q->net_qhead;
1650 		q->net_qhead = q->net_qhead->next;
1651 		if (aio->qhandle) {
1652 			aio->qhandle->sb_user--;
1653 			if (aio->qhandle->sb_user == 0) {
1654 				rdc_fixlen(aio);
1655 				(void) nsc_free_buf(aio->qhandle);
1656 			}
1657 		}
1658 		kmem_free(aio, sizeof (*aio));
1659 		q->nitems--;
1660 	}
1661 	kmem_free(q, sizeof (*q));
1662 
1663 }
1664 
1665 /*
1666  * rdc_diskq_buf2queue()
1667  * take a chunk of the diskq, parse it and assemble
1668  * a chain of rdc_aio_t's.
1669  * updates QNXTIO()
1670  */
1671 net_queue *
1672 rdc_diskq_buf2queue(rdc_group_t *grp, nsc_buf_t **abuf, int index)
1673 {
1674 	rdc_aio_t *aio = NULL;
1675 	nsc_vec_t *vecp = NULL;
1676 	uchar_t *vaddr = NULL;
1677 	uchar_t *ioaddr = NULL;
1678 	net_queue *netq = NULL;
1679 	io_hdr  *hdr = NULL;
1680 	nsc_buf_t *buf = *abuf;
1681 	rdc_u_info_t *urdc = &rdc_u_info[index];
1682 	rdc_k_info_t *krdc = &rdc_k_info[index];
1683 	disk_queue *dq = &grp->diskq;
1684 	net_queue *nq = &grp->ra_queue;
1685 	int nullbuf = 0;
1686 	nsc_off_t endobuf;
1687 	nsc_off_t bufoff;
1688 	int vlen;
1689 	nsc_off_t fpos;
1690 	long bufcnt = 0;
1691 	int nullblocks = 0;
1692 	int fail = 1;
1693 
1694 	if (buf == NULL)
1695 		return (NULL);
1696 
1697 	netq = kmem_zalloc(sizeof (*netq), KM_NOSLEEP);
1698 	if (netq == NULL) {
1699 		cmn_err(CE_WARN, "!SNDR: unable to allocate net queue");
1700 		return (NULL);
1701 	}
1702 
1703 	vecp = buf->sb_vec;
1704 	vlen = vecp->sv_len;
1705 	vaddr = vecp->sv_addr;
1706 	bufoff = buf->sb_pos;
1707 	endobuf = bufoff + buf->sb_len;
1708 
1709 #ifdef DEBUG_FLUSHER_UBERNOISE
1710 	cmn_err(CE_WARN, "!BUFFOFFENTER %d", bufoff);
1711 #endif
1712 	/* CONSTCOND */
1713 	while (1) {
1714 		if (IS_STATE(urdc, RDC_LOGGING) ||
1715 		    (nq->qfflags & RDC_QFILLSLEEP)) {
1716 			fail = 0;
1717 			goto fail;
1718 		}
1719 #ifdef DEBUG_FLUSHER_UBERNOISE
1720 		cmn_err(CE_WARN, "!BUFFOFF_0 %d", bufoff);
1721 #endif
1722 
1723 		if ((vaddr == NULL) || (vlen == 0))
1724 			break;
1725 
1726 		if (vlen <= 0) {
1727 			vecp++;
1728 			vaddr = vecp->sv_addr;
1729 			vlen = vecp->sv_len;
1730 			if (vaddr == NULL)
1731 				break;
1732 		}
1733 
1734 		/* get the iohdr information */
1735 
1736 		hdr = kmem_zalloc(sizeof (*hdr), KM_NOSLEEP);
1737 		if (hdr == NULL) {
1738 			cmn_err(CE_WARN,
1739 			    "!SNDR: unable to alocate net queue header");
1740 			goto fail;
1741 		}
1742 
1743 		ioaddr = (uchar_t *)hdr;
1744 
1745 		bcopy(vaddr, ioaddr, sizeof (*hdr));
1746 
1747 		if (!rdc_iohdr_ok(hdr)) {
1748 			cmn_err(CE_WARN,
1749 			    "!unable to retrieve i/o data from queue %s "
1750 			    "at offset %" NSC_SZFMT " bp: %" NSC_SZFMT " bl: %"
1751 			    NSC_SZFMT, urdc->disk_queue,
1752 			    bufoff, buf->sb_pos, buf->sb_len);
1753 #ifdef DEBUG_DISKQ
1754 			cmn_err(CE_WARN, "!FAILING QUEUE state: %x",
1755 			    rdc_get_vflags(urdc));
1756 			cmn_err(CE_WARN, "!qinfo: " QDISPLAY(dq));
1757 			cmn_err(CE_WARN, "!VADDR %p, IOADDR %p", vaddr, ioaddr);
1758 			cmn_err(CE_WARN, "!BUF %p", buf);
1759 #endif
1760 			cmn_err(CE_WARN, "!qinfo: " QDISPLAYND(dq));
1761 
1762 			goto fail;
1763 		}
1764 
1765 		nullbuf = hdr->dat.flag & RDC_NULL_BUF;
1766 
1767 		bufoff += FBA_NUM(sizeof (*hdr));
1768 
1769 		/* out of buffer, set nxtio to re read this last hdr */
1770 		if (!nullbuf && ((bufoff + hdr->dat.len) > endobuf)) {
1771 			break;
1772 		}
1773 
1774 		bufcnt += FBA_NUM(sizeof (*hdr));
1775 
1776 		aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1777 		if (aio == NULL) {
1778 			bufcnt -= FBA_NUM(sizeof (*hdr));
1779 			cmn_err(CE_WARN, "!SNDR: net queue aio alloc failed");
1780 			goto fail;
1781 		}
1782 
1783 		if (!nullbuf) {
1784 			/* move to next iohdr in big buf */
1785 			bufoff += hdr->dat.len;
1786 			bufcnt += hdr->dat.len;
1787 		}
1788 
1789 		rdc_fill_aio(grp, aio, hdr, buf);
1790 
1791 		if (aio->index < 0) {
1792 			cmn_err(CE_WARN, "!Set id %d not found or no longer "
1793 			    "enabled, failing disk queue", hdr->dat.setid);
1794 			kmem_free(aio, sizeof (*aio));
1795 			goto fail;
1796 		}
1797 		if (aio->seq == RDC_NOSEQ) {
1798 			kmem_free(aio, sizeof (*aio));
1799 			fail = 0;
1800 			goto fail;
1801 		}
1802 		if (aio->handle == NULL)
1803 			nullblocks += aio->len;
1804 
1805 		rdc_add_iohdr(hdr, grp);
1806 		hdr = NULL; /* don't accidentally free on break or fail */
1807 		rdc_netqueue_insert(aio, netq);
1808 
1809 		/* no more buffer, skip the below logic */
1810 		if ((bufoff + FBA_NUM(sizeof (*hdr))) >= endobuf) {
1811 			break;
1812 		}
1813 
1814 		fpos = bufoff - buf->sb_pos;
1815 		vecp = buf->sb_vec;
1816 		for (; fpos >= FBA_NUM(vecp->sv_len); vecp++)
1817 			fpos -= FBA_NUM(vecp->sv_len);
1818 		vlen = vecp->sv_len - FBA_SIZE(fpos);
1819 		vaddr = vecp->sv_addr + FBA_SIZE(fpos);
1820 		/* abuf = NULL; */
1821 
1822 	}
1823 
1824 	/* free extraneous header */
1825 	if (hdr) {
1826 		kmem_free(hdr, sizeof (*hdr));
1827 		hdr = NULL;
1828 	}
1829 
1830 	/*
1831 	 * probably won't happen, but if we didn't goto fail, but
1832 	 * we don't contain anything meaningful.. return NULL
1833 	 * and let the flusher or the sleep/wakeup routines
1834 	 * decide
1835 	 */
1836 	if (netq && netq->nitems == 0) {
1837 		kmem_free(netq, sizeof (*netq));
1838 		return (NULL);
1839 	}
1840 
1841 #ifdef DEBUG
1842 	buf2qcalls++;
1843 	calc_perbuf(netq->nitems);
1844 #endif
1845 	if (IS_STATE(urdc, RDC_LOGGING) ||
1846 	    nq->qfflags & RDC_QFILLSLEEP) {
1847 		fail = 0;
1848 		goto fail;
1849 	}
1850 
1851 	mutex_enter(QLOCK(dq));
1852 	INC_QNXTIO(dq, bufcnt);
1853 	mutex_exit(QLOCK(dq));
1854 
1855 	netq->net_qtail->orig_len = nullblocks; /* overload */
1856 
1857 	return (netq);
1858 
1859 fail:
1860 
1861 	if (hdr) {
1862 		kmem_free(hdr, sizeof (*hdr));
1863 	}
1864 
1865 	if (netq) {
1866 		if (netq->nitems > 0) {
1867 			/* the never can happen case ... */
1868 			if ((netq->nitems == 1) &&
1869 			    (netq->net_qhead->handle == NULL))
1870 				(void) nsc_free_buf(buf);
1871 				*abuf = NULL;
1872 
1873 		}
1874 		rdc_discard_tmpq(netq);
1875 	}
1876 
1877 	mutex_enter(QLOCK(dq));
1878 	rdc_dump_iohdrs(dq);
1879 	mutex_exit(QLOCK(dq));
1880 
1881 	if (fail) { /* real failure, not just state change */
1882 #ifdef DEBUG
1883 		cmn_err(CE_WARN, "!rdc_diskq_buf2queue: failing disk queue %s",
1884 		    urdc->disk_queue);
1885 #endif
1886 		rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
1887 	}
1888 
1889 	return (NULL);
1890 
1891 }
1892 
1893 /*
1894  * rdc_diskq_unqueue
1895  * remove one chunk from the diskq belonging to
1896  * rdc_k_info[index]
1897  * updates the head and tail pointers in the disk header
1898  * but does not write. The header should be written on ack
1899  * flusher should free whatever..
1900  */
1901 rdc_aio_t *
1902 rdc_diskq_unqueue(int index)
1903 {
1904 	int rc, rc1, rc2;
1905 	nsc_off_t qhead;
1906 	int nullhandle = 0;
1907 	io_hdr *iohdr;
1908 	rdc_aio_t *aio = NULL;
1909 	nsc_buf_t *buf = NULL;
1910 	nsc_buf_t *abuf = NULL;
1911 	rdc_group_t *group = NULL;
1912 	disk_queue *q = NULL;
1913 	rdc_k_info_t *krdc = &rdc_k_info[index];
1914 	rdc_u_info_t *urdc = &rdc_u_info[index];
1915 
1916 	group = krdc->group;
1917 	q = &group->diskq;
1918 
1919 	if (group->diskqfd == NULL) /* we've been disabled */
1920 		return (NULL);
1921 
1922 	aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1923 	if (!aio) {
1924 		return (NULL);
1925 	}
1926 
1927 	iohdr = kmem_zalloc(sizeof (*iohdr), KM_NOSLEEP);
1928 	if (!iohdr) {
1929 		kmem_free(aio, sizeof (*aio));
1930 		return (NULL);
1931 	}
1932 
1933 	mutex_enter(QLOCK(q));
1934 	rdc_set_qbusy(q); /* make sure no one disables the queue */
1935 	mutex_exit(QLOCK(q));
1936 
1937 	DTRACE_PROBE(rdc_diskq_unq_rsrv);
1938 
1939 	if (_rdc_rsrv_diskq(group)) {
1940 		cmn_err(CE_WARN, "!rdc_unqueue: %s reserve failed",
1941 		    urdc->disk_queue);
1942 		goto fail;
1943 	}
1944 
1945 	mutex_enter(QHEADLOCK(q));
1946 	mutex_enter(QLOCK(q));
1947 
1948 	if (IS_STATE(urdc, RDC_DISKQ_FAILED) || IS_STATE(urdc, RDC_LOGGING)) {
1949 		rdc_clr_qbusy(q);
1950 		mutex_exit(QLOCK(q));
1951 		mutex_exit(QHEADLOCK(q));
1952 		kmem_free(aio, sizeof (*aio));
1953 		kmem_free(iohdr, sizeof (*iohdr));
1954 		return (NULL);
1955 	}
1956 
1957 	if (QNXTIOSHLDWRAP(q)) {
1958 #ifdef DEBUG_DISKQWRAP
1959 		cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(q));
1960 #endif
1961 		/*LINTED*/
1962 		WRAPQNXTIO(q);
1963 	}
1964 
1965 	/* read the metainfo at q->nxt_io first */
1966 	if (QNXTIO(q) == QTAIL(q)) { /* empty */
1967 
1968 		_rdc_rlse_diskq(group);
1969 		if (q->lastio->handle)
1970 			(void) nsc_free_buf(q->lastio->handle);
1971 		bzero(&(*q->lastio), sizeof (*q->lastio));
1972 
1973 		mutex_exit(QHEADLOCK(q));
1974 		rdc_clr_qbusy(q);
1975 		mutex_exit(QLOCK(q));
1976 		kmem_free(aio, sizeof (*aio));
1977 		kmem_free(iohdr, sizeof (*iohdr));
1978 		return (NULL);
1979 	}
1980 
1981 	qhead = QNXTIO(q);
1982 
1983 	/*
1984 	 * have to drop the lock here, sigh. Cannot block incoming io
1985 	 * we have to wait until after this read to find out how
1986 	 * much to increment QNXTIO. Might as well grab the seq then too
1987 	 */
1988 
1989 	while ((qhead == LASTQTAIL(q)) && (IS_QSTATE(q, QTAILBUSY))) {
1990 		mutex_exit(QLOCK(q));
1991 #ifdef DEBUG_DISKQ
1992 		cmn_err(CE_NOTE, "!Qtail busy delay lastqtail: %d", qhead);
1993 #endif
1994 		delay(5);
1995 		mutex_enter(QLOCK(q));
1996 	}
1997 	mutex_exit(QLOCK(q));
1998 
1999 	DTRACE_PROBE(rdc_diskq_iohdr_read_start);
2000 
2001 	rc = rdc_ns_io(group->diskqfd, NSC_READ, qhead,
2002 	    (uchar_t *)iohdr, FBA_SIZE(1));
2003 
2004 	DTRACE_PROBE(rdc_diskq_iohdr_read_end);
2005 
2006 	if (!RDC_SUCCESS(rc) || !rdc_iohdr_ok(iohdr)) {
2007 		cmn_err(CE_WARN, "!unable to retrieve i/o data from queue %s"
2008 		    " at offset %" NSC_SZFMT " rc %d", urdc->disk_queue,
2009 		    qhead, rc);
2010 #ifdef DEBUG_DISKQ
2011 		cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
2012 #endif
2013 		mutex_exit(QHEADLOCK(q));
2014 		goto fail;
2015 	}
2016 
2017 /* XXX process buffer here, creating rdc_aio_t's */
2018 
2019 	mutex_enter(QLOCK(q));
2020 	/* update the next pointer */
2021 	if (iohdr->dat.flag == RDC_NULL_BUF) {
2022 		INC_QNXTIO(q, FBA_LEN(sizeof (io_hdr)));
2023 		nullhandle = 1;
2024 	} else {
2025 		INC_QNXTIO(q, (FBA_LEN(sizeof (io_hdr)) + iohdr->dat.len));
2026 	}
2027 
2028 	aio->seq = group->seq++;
2029 	if (group->seq < aio->seq)
2030 		group->seq = RDC_NEWSEQ + 1;
2031 
2032 	mutex_exit(QLOCK(q));
2033 	mutex_exit(QHEADLOCK(q));
2034 
2035 #ifdef DEBUG_FLUSHER_UBERNOISE
2036 	p = &iohdr->dat;
2037 	cmn_err(CE_NOTE, "!unqueued iohdr from %d pos: %d len: %d flag: %d "
2038 	    "iostatus: %d setid: %d time: %d", qhead, p->pos, p->len,
2039 	    p->flag, p->iostatus, p->setid, p->time);
2040 #endif
2041 
2042 	if (nullhandle) /* nothing to get from queue */
2043 		goto nullbuf;
2044 
2045 	/* now that we know how much to get (iohdr.dat.len), get it */
2046 	DTRACE_PROBE(rdc_diskq_unq_allocbuf1_start);
2047 
2048 	rc = nsc_alloc_buf(group->diskqfd, qhead + 1, iohdr->dat.len,
2049 	    NSC_NOCACHE | NSC_READ, &buf);
2050 
2051 	DTRACE_PROBE(rdc_diskq_unq_allocbuf1_end);
2052 
2053 	/* and get somewhere to keep it for a bit */
2054 	DTRACE_PROBE(rdc_diskq_unq_allocbuf2_start);
2055 
2056 	rc1 = nsc_alloc_abuf(qhead + 1, iohdr->dat.len, 0, &abuf);
2057 
2058 	DTRACE_PROBE(rdc_diskq_unq_allocbuf2_end);
2059 
2060 	if (!RDC_SUCCESS(rc) || !RDC_SUCCESS(rc1)) { /* uh-oh */
2061 		cmn_err(CE_WARN, "!disk queue %s read failure",
2062 		    urdc->disk_queue);
2063 		goto fail;
2064 	}
2065 
2066 	/* move it on over... */
2067 	rc2 = nsc_copy(buf, abuf, qhead + 1, qhead + 1, iohdr->dat.len);
2068 
2069 	if (!RDC_SUCCESS(rc2)) {
2070 #ifdef DEBUG
2071 		cmn_err(CE_WARN, "!nsc_copy failed for diskq unqueue");
2072 #endif
2073 		goto fail;
2074 	}
2075 
2076 	/* let go of the real buf, we've got the abuf  */
2077 	(void) nsc_free_buf(buf);
2078 	buf = NULL;
2079 
2080 	aio->handle = abuf;
2081 	/* Hack in the original sb_pos */
2082 	aio->handle->sb_pos = iohdr->dat.hpos;
2083 
2084 	/* skip the RDC_HANDLE_LIMITS check */
2085 	abuf->sb_user |= RDC_DISKQUE;
2086 
2087 nullbuf:
2088 	if (nullhandle) {
2089 		aio->handle = NULL;
2090 	}
2091 
2092 	/* set up the rest of the aio values, seq set above ... */
2093 	aio->pos = iohdr->dat.pos;
2094 	aio->qpos = iohdr->dat.qpos;
2095 	aio->len = iohdr->dat.len;
2096 	aio->flag = iohdr->dat.flag;
2097 	aio->index = rdc_setid2idx(iohdr->dat.setid);
2098 	if (aio->index < 0) { /* uh-oh */
2099 #ifdef DEBUG
2100 		cmn_err(CE_WARN, "!rdc_diskq_unqueue: index < 0");
2101 #endif
2102 		goto fail;
2103 	}
2104 
2105 
2106 #ifdef DEBUG_FLUSHER_UBERNOISE_STAMP
2107 	h = &q->disk_hdr.h;
2108 	cmn_err(CE_NOTE, "!stamping diskq header:\n"
2109 	    "magic: %x\nstate: %d\nhead_offset: %d\n"
2110 	    "tail_offset: %d\ndisk_size: %d\nnitems: %d\nblocks: %d\n",
2111 	    h->magic, h->state, h->head_offset, h->tail_offset,
2112 	    h->disk_size, h->nitems, h->blocks);
2113 #endif
2114 
2115 	_rdc_rlse_diskq(group);
2116 
2117 	mutex_enter(QLOCK(q));
2118 	rdc_clr_qbusy(q);
2119 	mutex_exit(QLOCK(q));
2120 
2121 	DTRACE_PROBE(rdc_diskq_unq_rlse);
2122 
2123 	iohdr->dat.iostatus = aio->seq;
2124 	rdc_add_iohdr(iohdr, group);
2125 
2126 #ifdef DEBUG_FLUSHER_UBERNOISE
2127 	if (!nullhandle) {
2128 		cmn_err(CE_NOTE, "!UNQUEUING, %p"
2129 		    " contents: %c%c%c%c%c pos: %d len: %d",
2130 		    (void *)aio->handle,
2131 		    aio->handle->sb_vec[0].sv_addr[0],
2132 		    aio->handle->sb_vec[0].sv_addr[1],
2133 		    aio->handle->sb_vec[0].sv_addr[2],
2134 		    aio->handle->sb_vec[0].sv_addr[3],
2135 		    aio->handle->sb_vec[0].sv_addr[4],
2136 		    aio->handle->sb_pos, aio->handle->sb_len);
2137 	} else {
2138 		cmn_err(CE_NOTE, "!UNQUEUING, NULL " QDISPLAY(q));
2139 	}
2140 	cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
2141 #endif
2142 
2143 	return (aio);
2144 
2145 fail:
2146 	if (aio)
2147 		kmem_free(aio, sizeof (*aio));
2148 	if (iohdr)
2149 		kmem_free(iohdr, sizeof (*iohdr));
2150 	if (buf)
2151 		(void) nsc_free_buf(buf);
2152 	if (abuf)
2153 		(void) nsc_free_buf(abuf);
2154 
2155 	_rdc_rlse_diskq(group);
2156 #ifdef DEBUG
2157 	cmn_err(CE_WARN, "!diskq_unqueue: failing diskq");
2158 #endif
2159 	mutex_enter(QLOCK(q));
2160 	rdc_clr_qbusy(q);
2161 	mutex_exit(QLOCK(q));
2162 
2163 	rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2164 
2165 	return (NULL);
2166 }
2167 
2168 int
2169 rdc_diskq_inuse(rdc_set_t *set, char *diskq)
2170 {
2171 	rdc_u_info_t *urdc;
2172 	char *group;
2173 	int index;
2174 
2175 	group = set->group_name;
2176 
2177 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
2178 
2179 	if ((rdc_lookup_bitmap(diskq) >= 0) ||
2180 	    (rdc_lookup_configured(diskq) >= 0)) {
2181 		return (1);
2182 	}
2183 	for (index = 0; index < rdc_max_sets; index++) {
2184 		urdc = &rdc_u_info[index];
2185 
2186 		if (!IS_ENABLED(urdc))
2187 			continue;
2188 
2189 		/* same diskq different group */
2190 		if ((strcmp(urdc->disk_queue, diskq) == 0) &&
2191 		    (urdc->group_name[0] == '\0' ||
2192 		    strcmp(urdc->group_name, group))) {
2193 			return (1);
2194 		}
2195 	}
2196 	/* last, but not least, lets see if someone is getting really funky */
2197 	if ((strcmp(set->disk_queue, set->primary.file) == 0) ||
2198 	    (strcmp(set->disk_queue, set->primary.bitmap) == 0)) {
2199 		return (1);
2200 	}
2201 
2202 	return (0);
2203 
2204 }
2205 
2206 #ifdef DEBUG
2207 int maxlen = 0;
2208 int avelen = 0;
2209 int totalen = 0;
2210 int lencalls = 0;
2211 
2212 void
2213 update_lenstats(int len)
2214 {
2215 	if (lencalls == 0) {
2216 		lencalls = 1;
2217 		avelen = 0;
2218 		maxlen = 0;
2219 		totalen = 0;
2220 	}
2221 
2222 	if (len > maxlen)
2223 		maxlen = len;
2224 	totalen += len;
2225 	avelen = totalen / lencalls;
2226 }
2227 #endif
2228 
2229 /*
2230  * rdc_calc_len()
2231  * returns the size of the diskq that can be read for dequeuing
2232  * always <= RDC_MAX_DISKQREAD
2233  */
2234 int
2235 rdc_calc_len(rdc_k_info_t *krdc, disk_queue *dq)
2236 {
2237 	nsc_size_t len = 0;
2238 
2239 	ASSERT(MUTEX_HELD(QLOCK(dq)));
2240 
2241 	/* ---H-----N-----T--- */
2242 	if (QNXTIO(dq) < QTAIL(dq)) {
2243 
2244 		len = min(RDC_MAX_DISKQREAD, QTAIL(dq) - QNXTIO(dq));
2245 
2246 	/* ---T-----H-----N--- */
2247 	} else if (QNXTIO(dq) > QTAIL(dq)) {
2248 		if (QWRAP(dq)) {
2249 			len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2250 		} else { /* should never happen */
2251 			len = min(RDC_MAX_DISKQREAD, QSIZE(dq) - QNXTIO(dq));
2252 		}
2253 	} else if (QNXTIO(dq) == QTAIL(dq)) {
2254 		if (QWRAP(dq) && !IS_QSTATE(dq, QNXTIOWRAPD))
2255 			len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2256 	}
2257 
2258 	len = min(len, krdc->maxfbas);
2259 
2260 #ifdef DEBUG
2261 	lencalls++;
2262 	update_lenstats(len);
2263 #endif
2264 
2265 	return ((int)len);
2266 }
2267 
2268 /*
2269  * lie a little if we can, so we don't get tied up in
2270  * _nsc_wait_dbuf() on the next read. sb_len MUST be
2271  * restored before nsc_free_buf() however, or we will
2272  * be looking at memory leak city..
2273  * so update the entire queue with the info as well
2274  * and the one that ends up freeing it, can fix the len
2275  * IMPORTANT: This assumes that we are not cached, in
2276  * 3.2 caching was turned off for data volumes, if that
2277  * changes, then this must too
2278  */
2279 void
2280 rdc_trim_buf(nsc_buf_t *buf, net_queue *q)
2281 {
2282 	rdc_aio_t *p;
2283 	int len;
2284 
2285 	if (buf == NULL || q == NULL)
2286 		return;
2287 
2288 	if (q && (buf->sb_len >
2289 	    (q->blocks + q->nitems - q->net_qtail->orig_len))) {
2290 		len = buf->sb_len;
2291 		buf->sb_len = (q->blocks + q->nitems - q->net_qtail->orig_len);
2292 	}
2293 
2294 	p = q->net_qhead;
2295 	do {
2296 		p->orig_len = len;
2297 		p = p->next;
2298 
2299 	} while (p);
2300 
2301 }
2302 
2303 /*
2304  * rdc_read_diskq_buf()
2305  * read a large as possible chunk of the diskq into a nsc_buf_t
2306  * and convert it to a net_queue of rdc_aio_t's to be appended
2307  * to the group's netqueue
2308  */
2309 net_queue *
2310 rdc_read_diskq_buf(int index)
2311 {
2312 	nsc_buf_t *buf = NULL;
2313 	net_queue *tmpnq = NULL;
2314 	disk_queue *dq = NULL;
2315 	rdc_k_info_t *krdc = &rdc_k_info[index];
2316 	rdc_u_info_t *urdc = &rdc_u_info[index];
2317 	rdc_group_t *group = krdc->group;
2318 	net_queue *nq = &group->ra_queue;
2319 	int len = 0;
2320 	int rc;
2321 	int fail = 0;
2322 	int offset = 0;
2323 
2324 	if (group == NULL || group->diskqfd == NULL) {
2325 		DTRACE_PROBE(rdc_read_diskq_buf_bail1);
2326 		return (NULL);
2327 	}
2328 
2329 	dq = &group->diskq;
2330 
2331 	mutex_enter(QLOCK(dq));
2332 	rdc_set_qbusy(dq); /* prevent disables on the queue */
2333 	mutex_exit(QLOCK(dq));
2334 
2335 	if (_rdc_rsrv_diskq(group)) {
2336 		cmn_err(CE_WARN, "!rdc_readdiskqbuf: %s reserve failed",
2337 		    urdc->disk_queue);
2338 		mutex_enter(QLOCK(dq));
2339 		rdc_clr_qbusy(dq); /* prevent disables on the queue */
2340 		mutex_exit(QLOCK(dq));
2341 		return (NULL);
2342 	}
2343 
2344 	mutex_enter(QHEADLOCK(dq));
2345 	mutex_enter(QLOCK(dq));
2346 
2347 	if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2348 	    IS_STATE(urdc, RDC_LOGGING) ||
2349 	    (nq->qfflags & RDC_QFILLSLEEP)) {
2350 		mutex_exit(QLOCK(dq));
2351 		mutex_exit(QHEADLOCK(dq));
2352 		DTRACE_PROBE(rdc_read_diskq_buf_bail2);
2353 		goto done;
2354 	}
2355 
2356 	/*
2357 	 * real corner case here, we need to let the flusher wrap first.
2358 	 * we've gotten too far ahead, so just delay and try again
2359 	 */
2360 	if (IS_QSTATE(dq, QNXTIOWRAPD) && AUXQWRAP(dq)) {
2361 		mutex_exit(QLOCK(dq));
2362 		mutex_exit(QHEADLOCK(dq));
2363 		goto done;
2364 	}
2365 
2366 	if (QNXTIOSHLDWRAP(dq)) {
2367 #ifdef DEBUG_DISKQWRAP
2368 		cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(dq));
2369 #endif
2370 		/*LINTED*/
2371 		WRAPQNXTIO(dq);
2372 	}
2373 
2374 	/* read the metainfo at q->nxt_io first */
2375 	if (!QNITEMS(dq)) { /* empty */
2376 
2377 		if (dq->lastio->handle)
2378 			(void) nsc_free_buf(dq->lastio->handle);
2379 		bzero(&(*dq->lastio), sizeof (*dq->lastio));
2380 		mutex_exit(QLOCK(dq));
2381 		mutex_exit(QHEADLOCK(dq));
2382 		DTRACE_PROBE(rdc_read_diskq_buf_bail3);
2383 		goto done;
2384 	}
2385 
2386 
2387 	len = rdc_calc_len(krdc, dq);
2388 
2389 	if ((len <= 0) || (IS_STATE(urdc, RDC_LOGGING)) ||
2390 	    (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2391 	    (nq->qfflags & RDC_QFILLSLEEP)) {
2392 		mutex_exit(QLOCK(dq));
2393 		mutex_exit(QHEADLOCK(dq));
2394 		/*
2395 		 * a write could be trying to get on the queue, or if
2396 		 * the queue is really really small, a complete image
2397 		 * of it could be on the net queue waiting for flush.
2398 		 * the latter being a fairly stupid scenario and a gross
2399 		 * misconfiguration.. but what the heck, why make the thread
2400 		 * thrash around.. just pause a little here.
2401 		 */
2402 		if (len <= 0)
2403 			delay(50);
2404 
2405 		DTRACE_PROBE3(rdc_read_diskq_buf_bail4, int, len,
2406 		    int, rdc_get_vflags(urdc), int, nq->qfflags);
2407 
2408 		goto done;
2409 	}
2410 
2411 	DTRACE_PROBE2(rdc_calc_len, int, len, int, (int)QNXTIO(dq));
2412 
2413 #ifdef DEBUG_FLUSHER_UBERNOISE
2414 	cmn_err(CE_WARN, "!CALC_LEN(%d) h:%d n%d t%d, w%d",
2415 	    len, QHEAD(dq), QNXTIO(dq), QTAIL(dq), QWRAP(dq));
2416 	cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(dq));
2417 #endif
2418 	SET_QCOALBOUNDS(dq, QNXTIO(dq) + len);
2419 
2420 	while ((LASTQTAIL(dq) > 0) && !QWRAP(dq) &&
2421 	    ((QNXTIO(dq) + len) >= LASTQTAIL(dq)) &&
2422 	    (IS_QSTATE(dq, QTAILBUSY))) {
2423 		mutex_exit(QLOCK(dq));
2424 
2425 #ifdef DEBUG_FLUSHER_UBERNOISE
2426 		cmn_err(CE_NOTE, "!Qtail busy delay nxtio %d len %d "
2427 		    "lastqtail: %d", QNXTIO(dq), len, LASTQTAIL(dq));
2428 #endif
2429 		delay(20);
2430 		mutex_enter(QLOCK(dq));
2431 	}
2432 
2433 	offset = QNXTIO(dq);
2434 
2435 	/*
2436 	 * one last check to see if we have gone logging, or should.
2437 	 * we may have released the mutex above, so check again
2438 	 */
2439 	if ((IS_STATE(urdc, RDC_LOGGING)) ||
2440 	    (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2441 	    (nq->qfflags & RDC_QFILLSLEEP)) {
2442 		mutex_exit(QLOCK(dq));
2443 		mutex_exit(QHEADLOCK(dq));
2444 		goto done;
2445 	}
2446 
2447 	mutex_exit(QLOCK(dq));
2448 	mutex_exit(QHEADLOCK(dq));
2449 
2450 	DTRACE_PROBE2(rdc_buf2q_preread, int, offset, int, len);
2451 
2452 	rc = nsc_alloc_buf(group->diskqfd, offset, len,
2453 	    NSC_NOCACHE | NSC_READ, &buf);
2454 
2455 	if (!RDC_SUCCESS(rc)) {
2456 		cmn_err(CE_WARN, "!disk queue %s read failure pos %" NSC_SZFMT
2457 		    " len %d", urdc->disk_queue, QNXTIO(dq), len);
2458 		fail++;
2459 		buf = NULL;
2460 		DTRACE_PROBE(rdc_read_diskq_buf_bail5);
2461 		goto done;
2462 	}
2463 
2464 	DTRACE_PROBE2(rdc_buf2q_postread, int, offset, nsc_size_t, buf->sb_len);
2465 
2466 	/*
2467 	 * convert buf to a net_queue. buf2queue will
2468 	 * update the QNXTIO pointer for us, based on
2469 	 * the last readable queue item
2470 	 */
2471 	tmpnq = rdc_diskq_buf2queue(group, &buf, index);
2472 
2473 #ifdef DEBUG_FLUSHER_UBERNOISE
2474 	cmn_err(CE_NOTE, "!QBUF p: %d l: %d p+l: %d users: %d qblocks: %d ",
2475 	    "qitems: %d WASTED: %d", buf->sb_pos, buf->sb_len,
2476 	    buf->sb_pos+buf->sb_len, buf->sb_user, tmpnq?tmpnq->blocks:-1,
2477 	    tmpnq?tmpnq->nitems:-1,
2478 	    tmpnq?((buf->sb_len-tmpnq->nitems) - tmpnq->blocks):-1);
2479 #endif
2480 
2481 	DTRACE_PROBE3(rdc_buf2que_returned, net_queue *, tmpnq?tmpnq:0,
2482 	    uint64_t, tmpnq?tmpnq->nitems:0,
2483 	    uint_t, tmpnq?tmpnq->net_qhead->seq:0);
2484 done:
2485 
2486 	/* we don't need to retain the buf */
2487 	if (tmpnq == NULL)
2488 		if (buf) {
2489 			(void) nsc_free_buf(buf);
2490 			buf = NULL;
2491 		}
2492 
2493 	rdc_trim_buf(buf, tmpnq);
2494 
2495 	mutex_enter(QLOCK(dq));
2496 	rdc_clr_qbusy(dq);
2497 	mutex_exit(QLOCK(dq));
2498 
2499 	_rdc_rlse_diskq(group);
2500 
2501 	if (fail) {
2502 		rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2503 		tmpnq = NULL;
2504 	}
2505 
2506 	return (tmpnq);
2507 }
2508 
2509 /*
2510  * rdc_dequeue()
2511  * removes the head of the memory queue
2512  */
2513 rdc_aio_t *
2514 rdc_dequeue(rdc_k_info_t *krdc, int *rc)
2515 {
2516 	net_queue *q = &krdc->group->ra_queue;
2517 	disk_queue *dq = &krdc->group->diskq;
2518 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2519 	rdc_aio_t *aio;
2520 
2521 	*rc = 0;
2522 
2523 	if (q == NULL)
2524 		return (NULL);
2525 
2526 	mutex_enter(&q->net_qlock);
2527 
2528 	aio = q->net_qhead;
2529 
2530 	if (aio == NULL) {
2531 #ifdef DEBUG
2532 		if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2533 			cmn_err(CE_PANIC,
2534 			    "rdc_dequeue(1): q %p, q blocks %" NSC_SZFMT
2535 			    " , nitems %" NSC_SZFMT ", qhead %p qtail %p",
2536 			    (void *) q, q->blocks, q->nitems,
2537 			    (void *) aio, (void *) q->net_qtail);
2538 		}
2539 #endif
2540 
2541 		mutex_exit(&q->net_qlock);
2542 
2543 		if ((!IS_STATE(urdc, RDC_LOGGING)) &&
2544 		    (!(q->qfflags & RDC_QFILLSLEEP)) &&
2545 		    (!IS_STATE(urdc, RDC_SYNCING)) && (QNITEMS(dq) > 0)) {
2546 			*rc = EAGAIN;
2547 		}
2548 
2549 		goto done;
2550 	}
2551 
2552 	/* aio remove from q */
2553 
2554 	q->net_qhead = aio->next;
2555 	aio->next = NULL;
2556 
2557 	if (q->net_qtail == aio)
2558 		q->net_qtail = q->net_qhead;
2559 
2560 	q->blocks -= aio->len;
2561 	q->nitems--;
2562 
2563 #ifdef DEBUG
2564 	if (q->net_qhead == NULL) {
2565 		if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2566 			cmn_err(CE_PANIC, "rdc_dequeue(2): q %p, q blocks %"
2567 			    NSC_SZFMT " nitems %" NSC_SZFMT
2568 			    " , qhead %p qtail %p",
2569 			    (void *) q, q->blocks, q->nitems,
2570 			    (void *) q->net_qhead, (void *) q->net_qtail);
2571 		}
2572 	}
2573 #endif
2574 	mutex_exit(&q->net_qlock);
2575 done:
2576 
2577 	mutex_enter(&q->net_qlock);
2578 
2579 	if (rdc_qfill_shldwakeup(krdc))
2580 		cv_broadcast(&q->qfcv);
2581 
2582 	/*
2583 	 * clear EAGAIN if
2584 	 * logging or q filler thread is sleeping or stopping altogether
2585 	 * or if q filler thread is dead already
2586 	 * or if syncing, this will return a null aio, with no error code set
2587 	 * telling the flusher to die
2588 	 */
2589 	if (*rc == EAGAIN) {
2590 		if (IS_STATE(urdc, RDC_LOGGING) ||
2591 		    (q->qfflags & (RDC_QFILLSLEEP | RDC_QFILLSTOP)) ||
2592 		    (IS_QSTATE(dq, (RDC_QDISABLEPEND | RDC_STOPPINGFLUSH))) ||
2593 		    (q->qfill_sleeping == RDC_QFILL_DEAD) ||
2594 		    (IS_STATE(urdc, RDC_SYNCING)))
2595 			*rc = 0;
2596 	}
2597 
2598 	mutex_exit(&q->net_qlock);
2599 
2600 	return (aio);
2601 
2602 }
2603 
2604 /*
2605  * rdc_qfill_shldsleep()
2606  * returns 1 if the qfilling code should cv_wait() 0 if not.
2607  * reasons for going into cv_wait();
2608  * there is nothing in the diskq to flush to mem.
2609  * the memory queue has gotten too big and needs more flushing attn.
2610  */
2611 int
2612 rdc_qfill_shldsleep(rdc_k_info_t *krdc)
2613 {
2614 	net_queue *nq = &krdc->group->ra_queue;
2615 	disk_queue *dq = &krdc->group->diskq;
2616 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2617 
2618 	ASSERT(MUTEX_HELD(&nq->net_qlock));
2619 
2620 	if (!RDC_IS_DISKQ(krdc->group))
2621 		return (1);
2622 
2623 	if (nq->qfflags & RDC_QFILLSLEEP) {
2624 #ifdef DEBUG_DISKQ_NOISY
2625 	cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QFILLSLEEP idx: %d",
2626 	    krdc->index);
2627 #endif
2628 		return (1);
2629 	}
2630 
2631 	if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2632 #ifdef DEBUG_DISKQ_NOISY
2633 	cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: Sync|Log (0x%x)"
2634 	    " idx: %d", rdc_get_vflags(urdc), urdc->index);
2635 #endif
2636 		return (1);
2637 	}
2638 
2639 	mutex_enter(QLOCK(dq));
2640 	if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
2641 #ifdef DEBUG_DISKQ_NOISY
2642 		cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QEMPTY");
2643 #endif
2644 		mutex_exit(QLOCK(dq));
2645 		return (1);
2646 	}
2647 	mutex_exit(QLOCK(dq));
2648 
2649 	if (nq->blocks >= RDC_MAX_QBLOCKS) {
2650 		nq->hwmhit = 1;
2651 		/* stuck flushers ? */
2652 #ifdef DEBUG_DISKQ_NOISY
2653 		cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: memq full:"
2654 		    " seq: %d seqack %d", krdc->group->seq,
2655 		    krdc->group->seqack);
2656 #endif
2657 		return (1);
2658 	}
2659 
2660 	return (0);
2661 }
2662 
2663 /*
2664  * rdc_join_netqueues(a, b)
2665  * appends queue b to queue a updating all the queue info
2666  * as it is assumed queue a is the important one,
2667  * it's mutex must be held. no one can add to queue b
2668  */
2669 void
2670 rdc_join_netqueues(net_queue *q, net_queue *tmpq)
2671 {
2672 	ASSERT(MUTEX_HELD(&q->net_qlock));
2673 
2674 	if (q->net_qhead == NULL) { /* empty */
2675 #ifdef DEBUG
2676 		if (q->blocks != 0 || q->nitems != 0) {
2677 			cmn_err(CE_PANIC, "rdc filler: q %p, qhead 0, "
2678 			    " q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT,
2679 			    (void *) q, q->blocks, q->nitems);
2680 		}
2681 #endif
2682 		q->net_qhead = tmpq->net_qhead;
2683 		q->net_qtail = tmpq->net_qtail;
2684 		q->nitems = tmpq->nitems;
2685 		q->blocks = tmpq->blocks;
2686 	} else {
2687 		q->net_qtail->next = tmpq->net_qhead;
2688 		q->net_qtail = tmpq->net_qtail;
2689 		q->nitems += tmpq->nitems;
2690 		q->blocks += tmpq->blocks;
2691 	}
2692 
2693 	if (q->nitems > q->nitems_hwm) {
2694 		q->nitems_hwm = q->nitems;
2695 	}
2696 
2697 	if (q->blocks > q->blocks_hwm) {
2698 		q->blocks_hwm = q->blocks;
2699 	}
2700 }
2701 
2702 /*
2703  * rdc_qfiller_thr() single thread that moves
2704  * data from the diskq to a memory queue for
2705  * the flusher to pick up.
2706  */
2707 void
2708 rdc_qfiller_thr(rdc_k_info_t *krdc)
2709 {
2710 	rdc_group_t *grp = krdc->group;
2711 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2712 	net_queue *q = &grp->ra_queue;
2713 	net_queue *tmpq = NULL;
2714 	int index = krdc->index;
2715 
2716 	q->qfill_sleeping = RDC_QFILL_AWAKE;
2717 	while (!(q->qfflags & RDC_QFILLSTOP)) {
2718 		if (!RDC_IS_DISKQ(grp) ||
2719 		    IS_STATE(urdc, RDC_LOGGING) ||
2720 		    IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2721 		    (q->qfflags & RDC_QFILLSLEEP)) {
2722 			goto nulltmpq;
2723 		}
2724 
2725 		DTRACE_PROBE(qfiller_top);
2726 		tmpq = rdc_read_diskq_buf(index);
2727 
2728 		if (tmpq == NULL)
2729 			goto nulltmpq;
2730 
2731 		if ((q->qfflags & RDC_QFILLSLEEP) ||
2732 		    IS_STATE(urdc, RDC_LOGGING)) {
2733 			rdc_discard_tmpq(tmpq);
2734 			goto nulltmpq;
2735 		}
2736 
2737 		mutex_enter(&q->net_qlock);
2738 
2739 		/* race with log, redundant yet paranoid */
2740 		if ((q->qfflags & RDC_QFILLSLEEP) ||
2741 		    IS_STATE(urdc, RDC_LOGGING)) {
2742 			rdc_discard_tmpq(tmpq);
2743 			mutex_exit(&q->net_qlock);
2744 			goto nulltmpq;
2745 		}
2746 
2747 
2748 		rdc_join_netqueues(q, tmpq);
2749 		kmem_free(tmpq, sizeof (*tmpq));
2750 		tmpq = NULL;
2751 
2752 		mutex_exit(&q->net_qlock);
2753 nulltmpq:
2754 		/*
2755 		 * sleep for a while if we can.
2756 		 * the enqueuing or flushing code will
2757 		 * wake us if if necessary.
2758 		 */
2759 		mutex_enter(&q->net_qlock);
2760 		while (rdc_qfill_shldsleep(krdc)) {
2761 			q->qfill_sleeping = RDC_QFILL_ASLEEP;
2762 			DTRACE_PROBE(qfiller_sleep);
2763 			cv_wait(&q->qfcv, &q->net_qlock);
2764 			DTRACE_PROBE(qfiller_wakeup);
2765 			q->qfill_sleeping = RDC_QFILL_AWAKE;
2766 			if (q->qfflags & RDC_QFILLSTOP) {
2767 #ifdef DEBUG_DISKQ
2768 			cmn_err(CE_NOTE,
2769 			    "!rdc_qfiller_thr: recieved kill signal");
2770 #endif
2771 				mutex_exit(&q->net_qlock);
2772 				goto done;
2773 			}
2774 		}
2775 		mutex_exit(&q->net_qlock);
2776 
2777 	DTRACE_PROBE(qfiller_bottom);
2778 	}
2779 done:
2780 	DTRACE_PROBE(qfiller_done);
2781 	q->qfill_sleeping = RDC_QFILL_DEAD; /* the big sleep */
2782 
2783 #ifdef DEBUG
2784 	cmn_err(CE_NOTE, "!rdc_qfiller_thr stopping");
2785 #endif
2786 	q->qfflags &= ~RDC_QFILLSTOP;
2787 
2788 }
2789 
2790 int
2791 _rdc_add_diskq(int index, char *diskq)
2792 {
2793 	rdc_k_info_t *krdc, *kp;
2794 	rdc_u_info_t *urdc, *up;
2795 	rdc_group_t *group;
2796 	int rc;
2797 
2798 	krdc = &rdc_k_info[index];
2799 	urdc = &rdc_u_info[index];
2800 	group = krdc->group;
2801 
2802 	if (!diskq || urdc->disk_queue[0]) { /* how'd that happen? */
2803 #ifdef DEBUG
2804 		cmn_err(CE_WARN, "!NULL diskq in _rdc_add_diskq");
2805 #endif
2806 		rc = -1;
2807 		goto fail;
2808 	}
2809 
2810 	/* if the enable fails, this is bzero'ed */
2811 	(void) strncpy(urdc->disk_queue, diskq, NSC_MAXPATH);
2812 	group->flags &= ~RDC_MEMQUE;
2813 	group->flags |= RDC_DISKQUE;
2814 
2815 #ifdef DEBUG
2816 	cmn_err(CE_NOTE, "!adding diskq to group %s", urdc->group_name);
2817 #endif
2818 	mutex_enter(&rdc_conf_lock);
2819 	rc = rdc_enable_diskq(krdc);
2820 	mutex_exit(&rdc_conf_lock);
2821 
2822 	if (rc == RDC_EQNOADD) {
2823 		goto fail;
2824 	}
2825 
2826 	RDC_ZERO_BITREF(krdc);
2827 	for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
2828 		up = &rdc_u_info[kp->index];
2829 		(void) strncpy(up->disk_queue, diskq, NSC_MAXPATH);
2830 		/* size lives in the diskq structure, already set by enable */
2831 		RDC_ZERO_BITREF(kp);
2832 	}
2833 
2834 fail:
2835 	return (rc);
2836 
2837 }
2838 
2839 /*
2840  * add a diskq to an existing set/group
2841  */
2842 int
2843 rdc_add_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2844 {
2845 	char *diskq;
2846 	int rc;
2847 	int index;
2848 	rdc_k_info_t *krdc, *this;
2849 	rdc_u_info_t *urdc;
2850 	rdc_group_t *group;
2851 	nsc_size_t vol_size = 0;
2852 	nsc_size_t req_size = 0;
2853 
2854 	mutex_enter(&rdc_conf_lock);
2855 	index = rdc_lookup_byname(uparms->rdc_set);
2856 	mutex_exit(&rdc_conf_lock);
2857 	if (index < 0) {
2858 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
2859 		    uparms->rdc_set->secondary.file);
2860 		rc = RDC_EALREADY;
2861 		goto failed;
2862 	}
2863 	urdc = &rdc_u_info[index];
2864 	krdc = &rdc_k_info[index];
2865 	this = &rdc_k_info[index];
2866 	group = krdc->group;
2867 	diskq = uparms->rdc_set->disk_queue;
2868 
2869 	if (!IS_ASYNC(urdc)) {
2870 		spcs_s_add(kstatus, RDC_EQWRONGMODE, urdc->primary.intf,
2871 		    urdc->primary.file, urdc->secondary.intf,
2872 		    urdc->secondary.file);
2873 		rc = RDC_EQNOQUEUE;
2874 		goto failed;
2875 	}
2876 
2877 	do {
2878 		if (!IS_STATE(urdc, RDC_LOGGING)) {
2879 			spcs_s_add(kstatus, RDC_EQNOTLOGGING,
2880 			    uparms->rdc_set->disk_queue);
2881 			rc = RDC_EQNOTLOGGING;
2882 			goto failed;
2883 		}
2884 		/* make sure that we have enough bitmap vol */
2885 		req_size = RDC_BITMAP_FBA + FBA_LEN(krdc->bitmap_size);
2886 		req_size += FBA_LEN(krdc->bitmap_size * BITS_IN_BYTE);
2887 
2888 		rc = _rdc_rsrv_devs(krdc, RDC_BMP, RDC_INTERNAL);
2889 
2890 		if (!RDC_SUCCESS(rc)) {
2891 			cmn_err(CE_WARN,
2892 			    "!rdc_open_diskq: Bitmap reserve failed");
2893 			spcs_s_add(kstatus, RDC_EBITMAP,
2894 			    urdc->primary.bitmap);
2895 			rc = RDC_EBITMAP;
2896 			goto failed;
2897 		}
2898 
2899 		(void) nsc_partsize(krdc->bitmapfd, &vol_size);
2900 
2901 		_rdc_rlse_devs(krdc, RDC_BMP);
2902 
2903 		if (vol_size < req_size) {
2904 			spcs_s_add(kstatus, RDC_EBITMAP2SMALL,
2905 			    urdc->primary.bitmap);
2906 			rc = RDC_EBITMAP2SMALL;
2907 			goto failed;
2908 		}
2909 
2910 		krdc = krdc->group_next;
2911 		urdc = &rdc_u_info[krdc->index];
2912 
2913 	} while (krdc != this);
2914 
2915 	if (urdc->disk_queue[0] != '\0') {
2916 		spcs_s_add(kstatus, RDC_EQALREADY, urdc->primary.intf,
2917 		    urdc->primary.file, urdc->secondary.intf,
2918 		    urdc->secondary.file);
2919 		rc = RDC_EQALREADY;
2920 		goto failed;
2921 	}
2922 
2923 	if (uparms->options & RDC_OPT_SECONDARY) { /* how'd we get here? */
2924 		spcs_s_add(kstatus, RDC_EQWRONGMODE);
2925 		rc = RDC_EQWRONGMODE;
2926 		goto failed;
2927 	}
2928 
2929 	mutex_enter(&rdc_conf_lock);
2930 	if (rdc_diskq_inuse(uparms->rdc_set, uparms->rdc_set->disk_queue)) {
2931 		spcs_s_add(kstatus, RDC_EDISKQINUSE,
2932 		    uparms->rdc_set->disk_queue);
2933 		rc = RDC_EDISKQINUSE;
2934 		mutex_exit(&rdc_conf_lock);
2935 		goto failed;
2936 	}
2937 	mutex_exit(&rdc_conf_lock);
2938 
2939 	rdc_group_enter(krdc);
2940 	rc = _rdc_add_diskq(urdc->index, diskq);
2941 	if (rc < 0 || rc == RDC_EQNOADD) {
2942 		group->flags &= ~RDC_DISKQUE;
2943 		group->flags |= RDC_MEMQUE;
2944 		spcs_s_add(kstatus, RDC_EQNOADD, uparms->rdc_set->disk_queue);
2945 		rc = RDC_EQNOADD;
2946 	}
2947 	rdc_group_exit(krdc);
2948 failed:
2949 	return (rc);
2950 }
2951 
2952 int
2953 _rdc_init_diskq(rdc_k_info_t *krdc)
2954 {
2955 	rdc_group_t *group = krdc->group;
2956 	disk_queue  *q = &group->diskq;
2957 
2958 	rdc_init_diskq_header(group, &group->diskq.disk_hdr);
2959 	SET_QNXTIO(q, QHEAD(q));
2960 
2961 	if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0)
2962 		goto fail;
2963 
2964 	return (0);
2965 fail:
2966 	return (-1);
2967 }
2968 
2969 /*
2970  * inititalize the disk queue. This is a destructive
2971  * operation that will not check for emptiness of the queue.
2972  */
2973 int
2974 rdc_init_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2975 {
2976 	int rc = 0;
2977 	int index;
2978 	rdc_k_info_t *krdc, *kp;
2979 	rdc_u_info_t *urdc, *up;
2980 	rdc_set_t    *uset;
2981 	rdc_group_t  *group;
2982 	disk_queue   *qp;
2983 
2984 	uset = uparms->rdc_set;
2985 
2986 	mutex_enter(&rdc_conf_lock);
2987 	index = rdc_lookup_byname(uset);
2988 	mutex_exit(&rdc_conf_lock);
2989 	if (index < 0) {
2990 		spcs_s_add(kstatus, RDC_EALREADY, uset->primary.file,
2991 		    uset->secondary.file);
2992 		rc = RDC_EALREADY;
2993 		goto fail;
2994 	}
2995 
2996 	krdc = &rdc_k_info[index];
2997 	urdc = &rdc_u_info[index];
2998 	group = krdc->group;
2999 	qp = &group->diskq;
3000 
3001 	if (!IS_STATE(urdc, RDC_SYNCING) && !IS_STATE(urdc, RDC_LOGGING)) {
3002 		spcs_s_add(kstatus, RDC_EQUEISREP, urdc->disk_queue);
3003 		rc = RDC_EQUEISREP;
3004 		goto fail;
3005 	}
3006 
3007 	/*
3008 	 * a couple of big "ifs" here. in the first implementation
3009 	 * neither of these will be possible. This will come into
3010 	 * play when we persist the queue across reboots
3011 	 */
3012 	if (!(uparms->options & RDC_OPT_FORCE_QINIT)) {
3013 		if (!QEMPTY(qp)) {
3014 			if (group->rdc_writer) {
3015 				spcs_s_add(kstatus, RDC_EQFLUSHING,
3016 				    urdc->disk_queue);
3017 				rc = RDC_EQFLUSHING;
3018 			} else {
3019 				spcs_s_add(kstatus, RDC_EQNOTEMPTY,
3020 				    urdc->disk_queue);
3021 				rc = RDC_EQNOTEMPTY;
3022 			}
3023 			goto fail;
3024 		}
3025 	}
3026 
3027 	mutex_enter(QLOCK(qp));
3028 	if (_rdc_init_diskq(krdc) < 0) {
3029 		mutex_exit(QLOCK(qp));
3030 		goto fail;
3031 	}
3032 	rdc_dump_iohdrs(qp);
3033 
3034 	rdc_group_enter(krdc);
3035 
3036 	rdc_clr_flags(urdc, RDC_QUEUING);
3037 	for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
3038 		up = &rdc_u_info[kp->index];
3039 		rdc_clr_flags(up, RDC_QUEUING);
3040 	}
3041 	rdc_group_exit(krdc);
3042 
3043 	mutex_exit(QLOCK(qp));
3044 
3045 	return (0);
3046 fail:
3047 	/* generic queue failure */
3048 	if (!rc) {
3049 		spcs_s_add(kstatus, RDC_EQINITFAIL, urdc->disk_queue);
3050 		rc = RDC_EQINITFAIL;
3051 	}
3052 
3053 	return (rc);
3054 }
3055 
3056 int
3057 _rdc_kill_diskq(rdc_u_info_t *urdc)
3058 {
3059 	rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
3060 	rdc_group_t *group = krdc->group;
3061 	disk_queue *q = &group->diskq;
3062 	rdc_u_info_t *up;
3063 	rdc_k_info_t *p;
3064 
3065 	group->flags |= RDC_DISKQ_KILL;
3066 #ifdef DEBUG
3067 	cmn_err(CE_NOTE, "!disabling disk queue %s", urdc->disk_queue);
3068 #endif
3069 
3070 	mutex_enter(QLOCK(q));
3071 	rdc_init_diskq_header(group, &q->disk_hdr);
3072 	rdc_dump_iohdrs(q);
3073 
3074 	/*
3075 	 * nsc_close the queue and zero out the queue name
3076 	 */
3077 	rdc_wait_qbusy(q);
3078 	rdc_close_diskq(group);
3079 	mutex_exit(QLOCK(q));
3080 	SET_QSIZE(q, 0);
3081 	rdc_clr_flags(urdc, RDC_DISKQ_FAILED);
3082 	bzero(urdc->disk_queue, NSC_MAXPATH);
3083 	for (p = krdc->group_next; p != krdc; p = p->group_next) {
3084 		up = &rdc_u_info[p->index];
3085 		rdc_clr_flags(up, RDC_DISKQ_FAILED);
3086 		bzero(up->disk_queue, NSC_MAXPATH);
3087 	}
3088 
3089 #ifdef DEBUG
3090 	cmn_err(CE_NOTE, "!_rdc_kill_diskq: enabling memory queue");
3091 #endif
3092 	group->flags &= ~(RDC_DISKQUE|RDC_DISKQ_KILL);
3093 	group->flags |= RDC_MEMQUE;
3094 	return (0);
3095 }
3096 
3097 /*
3098  * remove this diskq regardless of whether it is draining or not
3099  * stops the flusher by invalidating the qdata (ie, instant empty)
3100  * remove the disk qeueue from the group, leaving the group with a memory
3101  * queue.
3102  */
3103 int
3104 rdc_kill_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3105 {
3106 	int rc;
3107 	int index;
3108 	rdc_u_info_t *urdc;
3109 	rdc_k_info_t *krdc;
3110 	rdc_set_t *rdc_set = uparms->rdc_set;
3111 
3112 	mutex_enter(&rdc_conf_lock);
3113 	index = rdc_lookup_byname(uparms->rdc_set);
3114 	mutex_exit(&rdc_conf_lock);
3115 
3116 	if (index < 0) {
3117 		spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3118 		    rdc_set->secondary.file);
3119 		rc = RDC_EALREADY;
3120 		goto failed;
3121 	}
3122 
3123 	urdc = &rdc_u_info[index];
3124 	krdc = &rdc_k_info[index];
3125 
3126 	if (!RDC_IS_DISKQ(krdc->group)) {
3127 		spcs_s_add(kstatus, RDC_EQNOQUEUE, rdc_set->primary.intf,
3128 		    rdc_set->primary.file, rdc_set->secondary.intf,
3129 		    rdc_set->secondary.file);
3130 		rc = RDC_EQNOQUEUE;
3131 		goto failed;
3132 	}
3133 
3134 /*
3135  *	if (!IS_STATE(urdc, RDC_LOGGING)) {
3136  *		spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3137  *		    uparms->rdc_set->disk_queue);
3138  *		rc = RDC_EQNOTLOGGING;
3139  *		goto failed;
3140  *	}
3141  */
3142 	rdc_unintercept_diskq(krdc->group); /* stop protecting queue */
3143 	rdc_group_enter(krdc); /* to prevent further flushing */
3144 	rc = _rdc_kill_diskq(urdc);
3145 	rdc_group_exit(krdc);
3146 
3147 failed:
3148 	return (rc);
3149 }
3150 
3151 /*
3152  * remove a diskq from a group.
3153  * removal of a diskq from a set, or rather
3154  * a set from a queue, is done by reconfigging out
3155  * of the group. This removes the diskq from a whole
3156  * group and replaces it with a memory based queue
3157  */
3158 #define	NUM_RETRIES	15	/* Number of retries to wait if no progress */
3159 int
3160 rdc_rem_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3161 {
3162 	int index;
3163 	rdc_u_info_t *urdc;
3164 	rdc_k_info_t *krdc;
3165 	rdc_k_info_t *this;
3166 	volatile rdc_group_t *group;
3167 	volatile disk_queue *diskq;
3168 	int threads, counter;
3169 	long blocks;
3170 
3171 	mutex_enter(&rdc_conf_lock);
3172 	index = rdc_lookup_byname(uparms->rdc_set);
3173 	mutex_exit(&rdc_conf_lock);
3174 	if (index < 0) {
3175 		spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
3176 		    uparms->rdc_set->secondary.file);
3177 		return (RDC_EALREADY);
3178 	}
3179 
3180 	urdc = &rdc_u_info[index];
3181 	this = &rdc_k_info[index];
3182 	krdc = &rdc_k_info[index];
3183 
3184 	do {
3185 		if (!IS_STATE(urdc, RDC_LOGGING)) {
3186 			spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3187 			    urdc->disk_queue);
3188 			return (RDC_EQNOTLOGGING);
3189 		}
3190 		krdc = krdc->group_next;
3191 		urdc = &rdc_u_info[krdc->index];
3192 
3193 	} while (krdc != this);
3194 
3195 	/*
3196 	 * If there is no group or diskq configured, we can leave now
3197 	 */
3198 	if (!(group = krdc->group) || !(diskq = &group->diskq))
3199 		return (0);
3200 
3201 
3202 	/*
3203 	 * Wait if not QEMPTY or threads still active
3204 	 */
3205 	counter = 0;
3206 	while (!QEMPTY(diskq) || group->rdc_thrnum) {
3207 
3208 		/*
3209 		 * Capture counters to determine if progress is being made
3210 		 */
3211 		blocks = QBLOCKS(diskq);
3212 		threads = group->rdc_thrnum;
3213 
3214 		/*
3215 		 * Wait
3216 		 */
3217 		delay(HZ);
3218 
3219 		/*
3220 		 * Has the group or disk queue gone away while delayed?
3221 		 */
3222 		if (!(group = krdc->group) || !(diskq = &group->diskq))
3223 			return (0);
3224 
3225 		/*
3226 		 * Are we still seeing progress?
3227 		 */
3228 		if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
3229 			/*
3230 			 * No progress see, decrement retry counter
3231 			 */
3232 			if (counter++ > NUM_RETRIES) {
3233 				/*
3234 				 * No progress seen, increment retry counter
3235 				 */
3236 				int rc = group->rdc_thrnum ?
3237 				    RDC_EQFLUSHING : RDC_EQNOTEMPTY;
3238 				spcs_s_add(kstatus, rc, urdc->disk_queue);
3239 				return (rc);
3240 			}
3241 		} else {
3242 			/*
3243 			 * Reset counter, as we've made progress
3244 			 */
3245 			counter = 0;
3246 		}
3247 	}
3248 
3249 	return (0);
3250 }
3251