1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 /* Network data replicator Client side */
27
28
29 #include <sys/types.h>
30 #include <sys/debug.h>
31 #include <sys/ksynch.h>
32 #include <sys/cmn_err.h>
33 #include <sys/kmem.h>
34 #include <sys/cred.h>
35 #include <sys/byteorder.h>
36 #include <sys/errno.h>
37
38 #ifdef _SunOS_2_6
39 /*
40 * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we
41 * define enum_t here as it is all we need from rpc/types.h
42 * anyway and make it look like we included it. Yuck.
43 */
44 #define _RPC_TYPES_H
45 typedef int enum_t;
46 #else
47 #ifndef DS_DDICT
48 #include <rpc/types.h>
49 #endif
50 #endif /* _SunOS_2_6 */
51
52 #ifndef DS_DDICT
53 #include <rpc/auth.h>
54 #include <rpc/svc.h>
55 #include <rpc/xdr.h>
56 #endif
57 #include <sys/ddi.h>
58
59 #include <sys/nsc_thread.h>
60 #ifdef DS_DDICT
61 #include <sys/nsctl/contract.h>
62 #endif
63 #include <sys/nsctl/nsctl.h>
64
65 #include <sys/sdt.h> /* dtrace is S10 or later */
66
67 #include "rdc_io.h"
68 #include "rdc_clnt.h"
69 #include "rdc_bitmap.h"
70 #include "rdc_diskq.h"
71
72
73 kmutex_t rdc_clnt_lock;
74
75 #ifdef DEBUG
76 int noflush = 0;
77 #endif
78
79 int rdc_rpc_tmout = RDC_CLNT_TMOUT;
80 static void rdc_clnt_free(struct chtab *, CLIENT *);
81 static void _rdc_remote_flush(rdc_aio_t *);
82
83 void rdc_flush_memq(int index);
84 void rdc_flush_diskq(int index);
85 int rdc_drain_net_queue(int index);
86 void rdc_flusher_thread(int index);
87 int rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *);
88 void rdc_init_diskq_header(rdc_group_t *grp, dqheader *hd);
89 void rdc_dump_iohdrs(disk_queue *dq);
90 rdc_aio_t *rdc_dequeue(rdc_k_info_t *krdc, int *rc);
91 void rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_off_t qpos);
92 void rdc_close_diskq(rdc_group_t *krdc);
93
94 int rdc_writer(int index);
95
96 static struct chtab *rdc_chtable = NULL;
97 static int rdc_clnt_toomany;
98 #ifdef DEBUG
99 static int rdc_ooreply;
100 #endif
101
102 extern void rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag);
103 extern int _rdc_rsrv_diskq(rdc_group_t *group);
104 extern void _rdc_rlse_diskq(rdc_group_t *group);
105
106 static enum clnt_stat
cl_call_sig(struct __client * rh,rpcproc_t proc,xdrproc_t xargs,caddr_t argsp,xdrproc_t xres,caddr_t resp,struct timeval secs)107 cl_call_sig(struct __client *rh, rpcproc_t proc,
108 xdrproc_t xargs, caddr_t argsp, xdrproc_t xres,
109 caddr_t resp, struct timeval secs)
110 {
111 enum clnt_stat stat;
112 k_sigset_t smask;
113 sigintr(&smask, 0);
114 rh->cl_nosignal = TRUE;
115 stat = ((*(rh)->cl_ops->cl_call)\
116 (rh, proc, xargs, argsp, xres, resp, secs));
117 rh->cl_nosignal = FALSE;
118 sigunintr(&smask);
119 return (stat);
120 }
121
122 int
rdc_net_getsize(int index,uint64_t * sizeptr)123 rdc_net_getsize(int index, uint64_t *sizeptr)
124 {
125 struct timeval t;
126 int err, size;
127 rdc_k_info_t *krdc = &rdc_k_info[index];
128 int remote_index = krdc->remote_index;
129
130 *sizeptr = 0;
131 if (krdc->remote_index < 0)
132 return (EINVAL);
133
134 t.tv_sec = rdc_rpc_tmout;
135 t.tv_usec = 0;
136
137 #ifdef DEBUG
138 if (krdc->intf == NULL)
139 cmn_err(CE_WARN,
140 "!rdc_net_getsize: null intf for index %d", index);
141 #endif
142 if (krdc->rpc_version <= RDC_VERSION5) {
143 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE,
144 krdc->rpc_version, xdr_int, (char *)&remote_index,
145 xdr_int, (char *)&size, &t);
146 if (err == 0)
147 *sizeptr = size;
148 } else {
149 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE6,
150 krdc->rpc_version, xdr_int, (char *)&remote_index,
151 xdr_u_longlong_t, (char *)sizeptr, &t);
152 }
153 return (err);
154 }
155
156
157 int
rdc_net_state(int index,int options)158 rdc_net_state(int index, int options)
159 {
160 struct timeval t;
161 int err;
162 int remote_index = -1;
163 rdc_u_info_t *urdc = &rdc_u_info[index];
164 rdc_k_info_t *krdc = &rdc_k_info[index];
165 struct set_state s;
166 struct set_state4 s4;
167 char neta[32], rneta[32];
168 unsigned short *sp;
169
170 t.tv_sec = rdc_rpc_tmout;
171 t.tv_usec = 0;
172
173 if (krdc->rpc_version < RDC_VERSION7) {
174 s4.netaddrlen = urdc->primary.addr.len;
175 s4.rnetaddrlen = urdc->secondary.addr.len;
176 bcopy(urdc->primary.addr.buf, s4.netaddr, s4.netaddrlen);
177 bcopy(urdc->secondary.addr.buf, s4.rnetaddr, s4.rnetaddrlen);
178 (void) strncpy(s4.pfile, urdc->primary.file, RDC_MAXNAMLEN);
179 (void) strncpy(s4.sfile, urdc->secondary.file, RDC_MAXNAMLEN);
180 s4.flag = options;
181
182 err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE,
183 krdc->rpc_version, xdr_set_state4, (char *)&s4, xdr_int,
184 (char *)&remote_index, &t);
185 } else {
186 s.netaddrlen = urdc->primary.addr.len;
187 s.rnetaddrlen = urdc->secondary.addr.len;
188 s.netaddr.buf = neta;
189 s.rnetaddr.buf = rneta;
190 bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen);
191 bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen);
192 s.netaddr.len = urdc->primary.addr.len;
193 s.rnetaddr.len = urdc->secondary.addr.len;
194 s.netaddr.maxlen = urdc->primary.addr.len;
195 s.rnetaddr.maxlen = urdc->secondary.addr.len;
196 sp = (unsigned short *)s.netaddr.buf;
197 *sp = htons(*sp);
198 sp = (unsigned short *)s.rnetaddr.buf;
199 *sp = htons(*sp);
200 s.pfile = urdc->primary.file;
201 s.sfile = urdc->secondary.file;
202 s.flag = options;
203
204 err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE,
205 krdc->rpc_version, xdr_set_state, (char *)&s, xdr_int,
206 (char *)&remote_index, &t);
207 }
208
209 if (err)
210 return (-1);
211 else
212 return (remote_index);
213 }
214
215
216 /*
217 * rdc_net_getbmap
218 * gets the bitmaps from remote side and or's them with remote bitmap
219 */
220 int
rdc_net_getbmap(int index,int size)221 rdc_net_getbmap(int index, int size)
222 {
223 struct timeval t;
224 int err;
225 struct bmap b;
226 struct bmap6 b6;
227 rdc_k_info_t *krdc;
228
229 krdc = &rdc_k_info[index];
230
231 if (krdc->remote_index < 0)
232 return (EINVAL);
233
234 t.tv_sec = rdc_rpc_tmout;
235 t.tv_usec = 0;
236 #ifdef DEBUG
237 if (krdc->intf == NULL)
238 cmn_err(CE_WARN,
239 "!rdc_net_getbmap: null intf for index %d", index);
240 #endif
241
242 if (krdc->rpc_version <= RDC_VERSION5) {
243 b.cd = krdc->remote_index;
244 b.dual = index;
245 b.size = size;
246 err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP,
247 krdc->rpc_version, xdr_bmap, (char *)&b, xdr_int,
248 (char *)&err, &t);
249
250 } else {
251 b6.cd = krdc->remote_index;
252 b6.dual = index;
253 b6.size = size;
254 err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP6,
255 krdc->rpc_version, xdr_bmap6, (char *)&b6, xdr_int,
256 (char *)&err, &t);
257 }
258 return (err);
259 }
260
261 int sndr_proto = 0;
262
263 /*
264 * return state corresponding to rdc_host
265 */
266 int
rdc_net_getstate(rdc_k_info_t * krdc,int * serial_mode,int * use_mirror,int * mirror_down,int network)267 rdc_net_getstate(rdc_k_info_t *krdc, int *serial_mode, int *use_mirror,
268 int *mirror_down, int network)
269 {
270 int err;
271 struct timeval t;
272 int state;
273 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
274 struct set_state s;
275 #ifdef sparc
276 struct set_state4 s4;
277 #endif
278 char neta[32];
279 char rneta[32];
280 unsigned short *sp;
281 char *setp = (char *)&s;
282 xdrproc_t xdr_proc = xdr_set_state;
283
284 if (krdc->lsrv && (krdc->intf == NULL || krdc->intf->if_down) &&
285 network) /* fail fast */
286 return (-1);
287
288 s.netaddrlen = urdc->primary.addr.len;
289 s.rnetaddrlen = urdc->secondary.addr.len;
290 s.pfile = urdc->primary.file;
291 s.sfile = urdc->secondary.file;
292 s.netaddr.buf = neta;
293 s.rnetaddr.buf = rneta;
294 bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen);
295 bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen);
296 sp = (unsigned short *) s.netaddr.buf;
297 *sp = htons(*sp);
298 sp = (unsigned short *) s.rnetaddr.buf;
299 *sp = htons(*sp);
300 s.netaddr.len = urdc->primary.addr.len;
301 s.rnetaddr.len = urdc->secondary.addr.len;
302 s.netaddr.maxlen = urdc->primary.addr.maxlen;
303 s.rnetaddr.maxlen = urdc->secondary.addr.maxlen;
304 s.flag = 0;
305
306 t.tv_sec = rdc_rpc_tmout;
307 t.tv_usec = 0;
308
309 if (sndr_proto)
310 krdc->rpc_version = sndr_proto;
311 else
312 krdc->rpc_version = RDC_VERS_MAX;
313
314 again:
315 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSTATE4, krdc->rpc_version,
316 xdr_proc, setp, xdr_int, (char *)&state, &t);
317
318 if (err == RPC_PROGVERSMISMATCH && (krdc->rpc_version !=
319 RDC_VERS_MIN)) {
320 if (krdc->rpc_version-- == RDC_VERSION7) {
321 /* set_state struct changed with v7 of protocol */
322 #ifdef sparc
323 s4.netaddrlen = urdc->primary.addr.len;
324 s4.rnetaddrlen = urdc->secondary.addr.len;
325 bcopy(urdc->primary.addr.buf, s4.netaddr,
326 s4.netaddrlen);
327 bcopy(urdc->secondary.addr.buf, s4.rnetaddr,
328 s4.rnetaddrlen);
329 (void) strncpy(s4.pfile, urdc->primary.file,
330 RDC_MAXNAMLEN);
331 (void) strncpy(s4.sfile, urdc->secondary.file,
332 RDC_MAXNAMLEN);
333 s4.flag = 0;
334 xdr_proc = xdr_set_state4;
335 setp = (char *)&s4;
336 #else
337 /* x64 can not use protocols < 7 */
338 return (-1);
339 #endif
340 }
341 goto again;
342 }
343 #ifdef DEBUG
344 cmn_err(CE_NOTE, "!sndr get_state: Protocol ver %d", krdc->rpc_version);
345 #endif
346
347 if (err) {
348 return (-1);
349 }
350
351 if (state == -1)
352 return (-1);
353
354 if (serial_mode)
355 *serial_mode = (state >> 2) & 1;
356 if (use_mirror)
357 *use_mirror = (state >> 1) & 1;
358 if (mirror_down)
359 *mirror_down = state & 1;
360
361 return (0);
362 }
363
364
365 static struct xdr_discrim rdres_discrim[2] = {
366 { (int)RDC_OK, xdr_readok },
367 { __dontcare__, NULL_xdrproc_t }
368 };
369
370
371 /*
372 * Reply from remote read (client side)
373 */
374 static bool_t
xdr_rdresult(XDR * xdrs,readres * rr)375 xdr_rdresult(XDR *xdrs, readres *rr)
376 {
377
378 return (xdr_union(xdrs, (enum_t *)&(rr->rr_status),
379 (caddr_t)&(rr->rr_ok), rdres_discrim, xdr_void));
380 }
381
382 static int
rdc_rrstatus_decode(int status)383 rdc_rrstatus_decode(int status)
384 {
385 int ret = 0;
386
387 if (status != RDC_OK) {
388 switch (status) {
389 case RDCERR_NOENT:
390 ret = ENOENT;
391 break;
392 case RDCERR_NOMEM:
393 ret = ENOMEM;
394 break;
395 default:
396 ret = EIO;
397 break;
398 }
399 }
400
401 return (ret);
402 }
403
404
405 int
rdc_net_read(int local_index,int remote_index,nsc_buf_t * handle,nsc_off_t fba_pos,nsc_size_t fba_len)406 rdc_net_read(int local_index, int remote_index, nsc_buf_t *handle,
407 nsc_off_t fba_pos, nsc_size_t fba_len)
408 {
409 struct rdcrdresult rr;
410 rdc_k_info_t *krdc;
411 rdc_u_info_t *urdc;
412 struct rread list;
413 struct rread6 list6;
414 struct timeval t;
415 uchar_t *sv_addr;
416 nsc_vec_t *vec;
417 int rpc_flag;
418 nsc_size_t sv_len;
419 int err;
420 int ret;
421 nsc_size_t len;
422 nsc_size_t maxfbas;
423 int transflag;
424
425 if (handle == NULL)
426 return (EINVAL);
427
428 if (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len)) {
429 #ifdef DEBUG
430 cmn_err(CE_NOTE, "!rdc_net_read: handle bounds");
431 #endif
432 return (EINVAL);
433 }
434
435 krdc = &rdc_k_info[local_index];
436 urdc = &rdc_u_info[local_index];
437
438 maxfbas = MAX_RDC_FBAS;
439
440 if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) {
441 nsc_buf_t *remote_h = NULL;
442 int reserved = 0;
443
444 ret = nsc_reserve(krdc->remote_fd, NSC_MULTI);
445 if (RDC_SUCCESS(ret)) {
446 reserved = 1;
447 ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len,
448 NSC_RDBUF, &remote_h);
449 }
450 if (RDC_SUCCESS(ret)) {
451 ret = nsc_copy(remote_h, handle, fba_pos, fba_pos,
452 fba_len);
453 if (RDC_SUCCESS(ret)) {
454 (void) nsc_free_buf(remote_h);
455 nsc_release(krdc->remote_fd);
456 return (0);
457 }
458 }
459 rdc_group_enter(krdc);
460 rdc_set_flags(urdc, RDC_FCAL_FAILED);
461 rdc_group_exit(krdc);
462 if (remote_h)
463 (void) nsc_free_buf(remote_h);
464 if (reserved)
465 nsc_release(krdc->remote_fd);
466 }
467
468 t.tv_sec = rdc_rpc_tmout;
469 t.tv_usec = 0;
470
471 if (rdc_get_vflags(urdc) & RDC_VOL_FAILED)
472 rpc_flag = RDC_RREAD_FAIL;
473 else
474 rpc_flag = 0;
475
476 #ifdef DEBUG
477 if (krdc->intf == NULL)
478 cmn_err(CE_WARN,
479 "!rdc_net_read: null intf for index %d", local_index);
480 #endif
481 /*
482 * switch on proto version.
483 */
484 len = fba_len; /* length (FBAs) still to xfer */
485 rr.rr_bufsize = 0; /* rpc data buffer length (bytes) */
486 rr.rr_data = NULL; /* rpc data buffer */
487 transflag = rpc_flag | RDC_RREAD_START; /* setup rpc */
488 if (krdc->rpc_version <= RDC_VERSION5) {
489 ASSERT(fba_pos <= INT32_MAX);
490 list.pos = (int)fba_pos; /* fba position of start of chunk */
491 list.cd = remote_index; /* remote end cd */
492 /* send setup rpc */
493 list.flag = transflag;
494 ASSERT(len <= INT32_MAX);
495 list.len = (int)len; /* total fba length */
496 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
497 krdc->rpc_version, xdr_rread, (char *)&list, xdr_int,
498 (char *)&ret, &t);
499
500 } else {
501 list6.pos = fba_pos; /* fba position of start of chunk */
502 list6.cd = remote_index; /* remote end cd */
503 /* send setup rpc */
504 list6.flag = transflag; /* setup rpc */
505 ASSERT(len <= INT32_MAX);
506 list6.len = (int)len; /* total fba length */
507 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
508 krdc->rpc_version, xdr_rread6, (char *)&list6, xdr_int,
509 (char *)&ret, &t);
510 }
511
512 if (err) {
513 #ifdef DEBUG
514 cmn_err(CE_NOTE, "!rdc_net_read: setup err %d", err);
515 #endif
516 if (err == RPC_INTR)
517 ret = EINTR;
518 else
519 ret = ENOLINK;
520
521 goto remote_rerror;
522 }
523
524 if (ret == 0) { /* No valid index from r_net_read */
525 #ifdef DEBUG
526 cmn_err(CE_NOTE,
527 "!rdc_net_read: no valid index from r_net_read");
528 #endif
529 return (ENOBUFS);
530 }
531 transflag = rpc_flag | RDC_RREAD_DATA;
532 if (krdc->rpc_version <= RDC_VERSION5) {
533 list.idx = ret; /* save idx to return to server */
534 list.flag = transflag;
535 /* move onto to data xfer rpcs */
536 } else {
537 list6.idx = ret; /* save idx to return to server */
538 list6.flag = transflag;
539 }
540
541 /* find starting position in handle */
542
543 vec = handle->sb_vec;
544
545 fba_pos -= handle->sb_pos;
546
547 for (; fba_pos >= FBA_NUM(vec->sv_len); vec++)
548 fba_pos -= FBA_NUM(vec->sv_len);
549
550 sv_addr = vec->sv_addr + FBA_SIZE(fba_pos); /* data in vector */
551 sv_len = vec->sv_len - FBA_SIZE(fba_pos); /* bytes in vector */
552
553 while (len) {
554 nsc_size_t translen;
555 if (len > maxfbas) {
556 translen = maxfbas;
557 } else {
558 translen = len;
559 }
560
561 if (FBA_SIZE(translen) > sv_len) {
562 translen = FBA_NUM(sv_len);
563 }
564
565 len -= translen;
566 if (len == 0) {
567 /* last data xfer rpc - tell server to cleanup */
568 transflag |= RDC_RREAD_END;
569 }
570
571 if (!rr.rr_data || (nsc_size_t)rr.rr_bufsize !=
572 FBA_SIZE(translen)) {
573 if (rr.rr_data)
574 kmem_free(rr.rr_data, rr.rr_bufsize);
575
576 ASSERT(FBA_SIZE(translen) <= INT32_MAX);
577 rr.rr_bufsize = FBA_SIZE(translen);
578 rr.rr_data = kmem_alloc(rr.rr_bufsize, KM_NOSLEEP);
579 }
580
581 if (!rr.rr_data) {
582 /* error */
583 #ifdef DEBUG
584 cmn_err(CE_NOTE, "!rdc_net_read: kmem_alloc failed");
585 #endif
586 return (ENOMEM);
587 }
588
589 /* get data from remote end */
590
591 #ifdef DEBUG
592 if (krdc->intf == NULL)
593 cmn_err(CE_WARN,
594 "!rdc_net_read: null intf for index %d",
595 local_index);
596 #endif
597 if (krdc->io_kstats) {
598 mutex_enter(krdc->io_kstats->ks_lock);
599 kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
600 mutex_exit(krdc->io_kstats->ks_lock);
601 }
602 /*CONSTCOND*/
603 ASSERT(RDC_MAXDATA <= INT32_MAX);
604 ASSERT(translen <= RDC_MAXDATA);
605 if (krdc->rpc_version <= RDC_VERSION5) {
606 list.len = (int)translen;
607 list.flag = transflag;
608 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
609 krdc->rpc_version, xdr_rread, (char *)&list,
610 xdr_rdresult, (char *)&rr, &t);
611 } else {
612 list6.len = (int)translen;
613 list6.flag = transflag;
614 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
615 krdc->rpc_version, xdr_rread6, (char *)&list6,
616 xdr_rdresult, (char *)&rr, &t);
617 }
618
619 if (krdc->io_kstats) {
620 mutex_enter(krdc->io_kstats->ks_lock);
621 kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
622 mutex_exit(krdc->io_kstats->ks_lock);
623 }
624
625 if (err) {
626 #ifdef DEBUG
627 cmn_err(CE_NOTE, "!rdc_net_read: rpc err %d", err);
628 #endif
629 if (err == RPC_INTR) {
630 ret = EINTR;
631 } else {
632 ret = ENOLINK;
633 }
634
635 goto remote_rerror;
636 }
637
638 if (rr.rr_status != RDC_OK) {
639 ret = rdc_rrstatus_decode(rr.rr_status);
640 if (!ret)
641 ret = EIO;
642
643 goto remote_rerror;
644 }
645
646 /* copy into handle */
647
648 bcopy(rr.rr_data, sv_addr, (size_t)rr.rr_bufsize);
649
650 /* update counters */
651
652 sv_addr += rr.rr_bufsize;
653 if (krdc->rpc_version <= RDC_VERSION5) {
654 list.pos += translen;
655 } else {
656 list6.pos += translen;
657 }
658 if (krdc->io_kstats) {
659 KSTAT_IO_PTR(krdc->io_kstats)->reads++;
660 KSTAT_IO_PTR(krdc->io_kstats)->nread += rr.rr_bufsize;
661 }
662 ASSERT(sv_len <= INT32_MAX);
663 ASSERT(sv_len >= (nsc_size_t)rr.rr_bufsize);
664 sv_len -= rr.rr_bufsize;
665
666 if (sv_len == 0) {
667 /* goto next vector */
668 vec++;
669 sv_addr = vec->sv_addr;
670 sv_len = vec->sv_len;
671 }
672 }
673
674 if (rr.rr_data)
675 kmem_free(rr.rr_data, rr.rr_bufsize);
676
677 return (0);
678
679 remote_rerror:
680 if (rr.rr_data)
681 kmem_free(rr.rr_data, rr.rr_bufsize);
682
683 return (ret ? ret : ENOLINK);
684 }
685
686 /*
687 * rdc_net_write
688 * Main remote write client side
689 * Handles protocol selection as well as requests for remote allocation
690 * and data transfer
691 * Does local IO for FCAL
692 * caller must clear bitmap on success
693 */
694
695 int
rdc_net_write(int local_index,int remote_index,nsc_buf_t * handle,nsc_off_t fba_pos,nsc_size_t fba_len,uint_t aseq,int qpos,netwriteres * netres)696 rdc_net_write(int local_index, int remote_index, nsc_buf_t *handle,
697 nsc_off_t fba_pos, nsc_size_t fba_len, uint_t aseq, int qpos,
698 netwriteres *netres)
699 {
700 rdc_k_info_t *krdc;
701 rdc_u_info_t *urdc;
702 struct timeval t;
703 nsc_vec_t *vec;
704 int sv_len;
705 nsc_off_t fpos;
706 int err;
707 struct netwriteres netret;
708 struct netwriteres *netresptr;
709 struct net_data5 dlist5;
710 struct net_data6 dlist6;
711 int ret;
712 nsc_size_t maxfbas;
713 int transflag;
714 int translen;
715 int transendoblk;
716 char *transptr;
717 int vflags;
718
719 if (handle == NULL)
720 return (EINVAL);
721
722 /* if not a diskq buffer */
723 if ((qpos == -1) && (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len))) {
724 #ifdef DEBUG
725 cmn_err(CE_NOTE, "!rdc_net_write: handle bounds");
726 #endif
727 return (EINVAL);
728 }
729
730
731 t.tv_sec = rdc_rpc_tmout;
732 t.tv_usec = 0;
733
734 krdc = &rdc_k_info[local_index];
735 urdc = &rdc_u_info[local_index];
736
737 maxfbas = MAX_RDC_FBAS;
738
739 /* FCAL IO */
740 if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) {
741 nsc_buf_t *remote_h = NULL;
742 int reserved = 0;
743
744 ret = nsc_reserve(krdc->remote_fd, NSC_MULTI);
745 if (RDC_SUCCESS(ret)) {
746 reserved = 1;
747 ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len,
748 NSC_WRBUF, &remote_h);
749 }
750 if (RDC_SUCCESS(ret)) {
751 ret = nsc_copy(handle, remote_h, fba_pos, fba_pos,
752 fba_len);
753 if (RDC_SUCCESS(ret))
754 ret = nsc_write(remote_h, fba_pos, fba_len, 0);
755 if (RDC_SUCCESS(ret)) {
756 (void) nsc_free_buf(remote_h);
757 nsc_release(krdc->remote_fd);
758 return (0);
759 }
760 }
761 rdc_group_enter(krdc);
762 rdc_set_flags(urdc, RDC_FCAL_FAILED);
763 rdc_group_exit(krdc);
764 if (remote_h)
765 (void) nsc_free_buf(remote_h);
766 if (reserved)
767 nsc_release(krdc->remote_fd);
768 }
769
770 /*
771 * At this point we must decide which protocol we are using and
772 * do the right thing
773 */
774 netret.vecdata.vecdata_val = NULL;
775 netret.vecdata.vecdata_len = 0;
776 if (netres) {
777 netresptr = netres;
778 } else {
779 netresptr = &netret;
780 }
781
782 vflags = rdc_get_vflags(urdc);
783
784 if (vflags & (RDC_VOL_FAILED|RDC_BMP_FAILED))
785 transflag = RDC_RWRITE_FAIL;
786 else
787 transflag = 0;
788
789
790 #ifdef DEBUG
791 if (krdc->intf == NULL)
792 cmn_err(CE_WARN, "!rdc_net_write: null intf for index %d",
793 local_index);
794 #endif
795
796 vec = handle->sb_vec;
797
798 /*
799 * find starting position in vector
800 */
801 if ((qpos == -1) || (handle->sb_user == RDC_NULLBUFREAD))
802 fpos = fba_pos - handle->sb_pos;
803 else
804 fpos = (qpos + 1) - handle->sb_pos;
805
806 for (; fpos >= FBA_NUM(vec->sv_len); vec++)
807 fpos -= FBA_NUM(vec->sv_len);
808 sv_len = vec->sv_len - FBA_SIZE(fpos); /* bytes in vector */
809 transptr = (char *)vec->sv_addr + FBA_SIZE(fpos);
810
811 if (krdc->rpc_version <= RDC_VERSION5) {
812 dlist5.local_cd = local_index;
813 dlist5.cd = remote_index;
814 ASSERT(fba_len <= INT32_MAX);
815 ASSERT(fba_pos <= INT32_MAX);
816 dlist5.len = (int)fba_len;
817 dlist5.pos = (int)fba_pos;
818 dlist5.idx = -1; /* Starting index */
819 dlist5.flag = transflag;
820 dlist5.seq = aseq; /* sequence number */
821 dlist5.sfba = (int)fba_pos; /* starting fba for this xfer */
822 } else {
823 dlist6.local_cd = local_index;
824 dlist6.cd = remote_index;
825 ASSERT(fba_len <= INT32_MAX);
826 dlist6.len = (int)fba_len;
827 dlist6.qpos = qpos;
828 dlist6.pos = fba_pos;
829 dlist6.idx = -1; /* Starting index */
830 dlist6.flag = transflag;
831 dlist6.seq = aseq; /* sequence number */
832 dlist6.sfba = fba_pos; /* starting fba for this xfer */
833 }
834
835 transendoblk = 0;
836 while (fba_len) {
837 if (!transptr) {
838 #ifdef DEBUG
839 cmn_err(CE_WARN,
840 "!rdc_net_write: walked off end of handle!");
841 #endif
842 ret = EINVAL;
843 goto remote_error;
844 }
845
846 if (fba_len > maxfbas) {
847 ASSERT(maxfbas <= INT32_MAX);
848 translen = (int)maxfbas;
849 } else {
850 ASSERT(fba_len <= INT32_MAX);
851 translen = (int)fba_len;
852 }
853
854 if (FBA_SIZE(translen) > sv_len) {
855 translen = FBA_NUM(sv_len);
856 }
857
858 fba_len -= translen;
859 if (fba_len == 0) {
860 /* last data xfer - tell server to commit */
861 transendoblk = 1;
862 }
863
864
865 #ifdef DEBUG
866 if (krdc->intf == NULL)
867 cmn_err(CE_WARN,
868 "!rdc_net_write: null intf for index %d",
869 local_index);
870 #endif
871 DTRACE_PROBE(rdc_netwrite_clntcall_start);
872
873 if (krdc->io_kstats) {
874 mutex_enter(krdc->io_kstats->ks_lock);
875 kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
876 mutex_exit(krdc->io_kstats->ks_lock);
877 }
878 if (krdc->rpc_version <= RDC_VERSION5) {
879 ret = 0;
880 dlist5.nfba = translen;
881 dlist5.endoblk = transendoblk;
882 dlist5.data.data_len = FBA_SIZE(translen);
883 dlist5.data.data_val = transptr;
884 err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE5,
885 krdc->rpc_version, xdr_net_data5,
886 (char *)&dlist5, xdr_int,
887 (char *)&ret, &t);
888 if (ret >= 0) {
889 netresptr->result = 0;
890 netresptr->index = ret;
891 } else {
892 netresptr->result = ret;
893 }
894 } else {
895 netresptr->result = 0;
896 dlist6.nfba = translen;
897 dlist6.endoblk = transendoblk;
898 dlist6.data.data_len = FBA_SIZE(translen);
899 dlist6.data.data_val = transptr;
900 err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6,
901 krdc->rpc_version, xdr_net_data6,
902 (char *)&dlist6, xdr_netwriteres,
903 (char *)netresptr, &t);
904 }
905
906 if (krdc->io_kstats) {
907 mutex_enter(krdc->io_kstats->ks_lock);
908 kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
909 mutex_exit(krdc->io_kstats->ks_lock);
910 }
911
912 DTRACE_PROBE(rdc_netwrite_clntcall_end);
913 ret = netresptr->result;
914 if (err) {
915 if (err == RPC_INTR)
916 ret = EINTR;
917 else if (err && ret != EPROTO)
918 ret = ENOLINK;
919 #ifdef DEBUG
920 cmn_err(CE_NOTE,
921 "!rdc_net_write(5): cd %d err %d ret %d",
922 remote_index, err, ret);
923 #endif
924 goto remote_error;
925 }
926 /* Error from r_net_write5 */
927 if (netresptr->result < 0) {
928 #ifdef DEBUG
929 cmn_err(CE_NOTE,
930 "!rdc_net_write: r_net_write(5) "
931 "returned: %d",
932 -netresptr->result);
933 #endif
934 ret = -netresptr->result;
935 if (netret.vecdata.vecdata_val)
936 kmem_free(netret.vecdata.vecdata_val,
937 netret.vecdata.vecdata_len *
938 sizeof (net_pendvec_t));
939 goto remote_error;
940 } else if (netresptr->index == 0) {
941 #ifdef DEBUG
942 cmn_err(CE_NOTE,
943 "!rdc_net_write: no valid index from "
944 "r_net_write(5)");
945 #endif
946 ret = ENOBUFS;
947 if (netret.vecdata.vecdata_val)
948 kmem_free(netret.vecdata.vecdata_val,
949 netret.vecdata.vecdata_len *
950 sizeof (net_pendvec_t));
951 goto remote_error;
952 }
953 if (krdc->rpc_version <= RDC_VERSION5) {
954 dlist5.idx = netresptr->index;
955 dlist5.sfba += dlist5.nfba;
956 } else {
957 dlist6.idx = netresptr->index;
958 dlist6.sfba += dlist6.nfba;
959 }
960 /* update counters */
961 if (krdc->io_kstats) {
962 KSTAT_IO_PTR(krdc->io_kstats)->writes++;
963 KSTAT_IO_PTR(krdc->io_kstats)->nwritten +=
964 FBA_SIZE(translen);
965 }
966 transptr += FBA_SIZE(translen);
967 sv_len -= FBA_SIZE(translen);
968
969 if (sv_len <= 0) {
970 /* goto next vector */
971 vec++;
972 transptr = (char *)vec->sv_addr;
973 sv_len = vec->sv_len;
974 }
975 }
976 /*
977 * this can't happen.....
978 */
979 if (netret.vecdata.vecdata_val)
980 kmem_free(netret.vecdata.vecdata_val,
981 netret.vecdata.vecdata_len *
982 sizeof (net_pendvec_t));
983
984 return (0);
985
986 remote_error:
987 return (ret ? ret : ENOLINK);
988 }
989
990 void
rdc_fixlen(rdc_aio_t * aio)991 rdc_fixlen(rdc_aio_t *aio)
992 {
993 nsc_vec_t *vecp = aio->qhandle->sb_vec;
994 nsc_size_t len = 0;
995
996 while (vecp->sv_addr) {
997 len += FBA_NUM(vecp->sv_len);
998 vecp++;
999 }
1000 aio->qhandle->sb_len = len;
1001 }
1002
1003 /*
1004 * rdc_dump_alloc_bufs_cd
1005 * Dump allocated buffers (rdc_net_hnd's) for the specified cd.
1006 * this could be the flusher failing, if so, don't do the delay forever
1007 * Returns: 0 (success), EAGAIN (caller needs to try again).
1008 */
1009 int
rdc_dump_alloc_bufs_cd(int index)1010 rdc_dump_alloc_bufs_cd(int index)
1011 {
1012 rdc_k_info_t *krdc;
1013 rdc_aio_t *aio;
1014 net_queue *q;
1015 disk_queue *dq;
1016 kmutex_t *qlock;
1017
1018 krdc = &rdc_k_info[index];
1019
1020
1021 if (!krdc->c_fd) {
1022 /* cannot do anything! */
1023 #ifdef DEBUG
1024 cmn_err(CE_WARN, "!rdc_dump_alloc_bufs_cd(%d): c_fd NULL",
1025 index);
1026 #endif
1027 return (0);
1028 }
1029 rdc_dump_dsets(index);
1030
1031 dq = &krdc->group->diskq;
1032
1033 if (RDC_IS_DISKQ(krdc->group)) {
1034 qlock = QLOCK(dq);
1035 (void) _rdc_rsrv_diskq(krdc->group);
1036 } else {
1037 qlock = &krdc->group->ra_queue.net_qlock;
1038 }
1039
1040 /*
1041 * Now dump the async queue anonymous buffers
1042 * if we are a diskq, the we are using the diskq mutex.
1043 * However, we are flushing from diskq to memory queue
1044 * so we now need to grab the memory lock also
1045 */
1046
1047 q = &krdc->group->ra_queue;
1048
1049 if (RDC_IS_DISKQ(krdc->group)) {
1050 mutex_enter(&q->net_qlock);
1051 if (q->qfill_sleeping == RDC_QFILL_AWAKE) {
1052 int tries = 5;
1053 #ifdef DEBUG_DISKQ
1054 cmn_err(CE_NOTE,
1055 "!dumpalloccd sending diskq->memq flush to sleep");
1056 #endif
1057 q->qfflags |= RDC_QFILLSLEEP;
1058 mutex_exit(&q->net_qlock);
1059
1060 while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--)
1061 delay(5);
1062 mutex_enter(&q->net_qlock);
1063 }
1064 }
1065
1066 mutex_enter(qlock);
1067
1068 while ((q->net_qhead != NULL)) {
1069 rdc_k_info_t *tmpkrdc;
1070 aio = q->net_qhead;
1071 tmpkrdc = &rdc_k_info[aio->index];
1072
1073 if (RDC_IS_DISKQ(krdc->group)) {
1074 aio->qhandle->sb_user--;
1075 if (aio->qhandle->sb_user == 0) {
1076 rdc_fixlen(aio);
1077 (void) nsc_free_buf(aio->qhandle);
1078 aio->qhandle = NULL;
1079 aio->handle = NULL;
1080 }
1081 } else {
1082 if (aio->handle) {
1083 (void) nsc_free_buf(aio->handle);
1084 aio->handle = NULL;
1085 }
1086 }
1087
1088 if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(krdc->group)) {
1089 mutex_enter(tmpkrdc->io_kstats->ks_lock);
1090 kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats));
1091 mutex_exit(tmpkrdc->io_kstats->ks_lock);
1092 }
1093 q->net_qhead = q->net_qhead->next;
1094 q->blocks -= aio->len;
1095 q->nitems--;
1096
1097 RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len);
1098
1099 kmem_free(aio, sizeof (*aio));
1100 }
1101 q->net_qtail = NULL;
1102
1103 if (krdc->group->asyncstall) {
1104 krdc->group->asyncdis = 1;
1105 cv_broadcast(&krdc->group->asyncqcv);
1106 }
1107 if (krdc->group->sleepq) {
1108 rdc_sleepqdiscard(krdc->group);
1109 }
1110
1111 krdc->group->seq = RDC_NEWSEQ;
1112 krdc->group->seqack = RDC_NEWSEQ;
1113 if (RDC_IS_DISKQ(krdc->group)) {
1114 rdc_dump_iohdrs(dq);
1115 SET_QNXTIO(dq, QHEAD(dq));
1116 SET_QCOALBOUNDS(dq, QHEAD(dq));
1117 }
1118 mutex_exit(qlock);
1119
1120 if (RDC_IS_DISKQ(krdc->group)) {
1121 mutex_exit(&q->net_qlock);
1122 _rdc_rlse_diskq(krdc->group);
1123 }
1124
1125 return (0);
1126 }
1127
1128
1129 /*
1130 * rdc_dump_alloc_bufs
1131 * We have an error on the link
1132 * Try to dump all of the allocated bufs so we can cleanly recover
1133 * and not hang
1134 */
1135 void
rdc_dump_alloc_bufs(rdc_if_t * ip)1136 rdc_dump_alloc_bufs(rdc_if_t *ip)
1137 {
1138 rdc_k_info_t *krdc;
1139 int repeat;
1140 int index;
1141
1142 for (index = 0; index < rdc_max_sets; index++) {
1143 do {
1144 krdc = &rdc_k_info[index];
1145 repeat = 0;
1146 if (krdc->intf == ip) {
1147 if (rdc_dump_alloc_bufs_cd(index) == EAGAIN) {
1148 repeat = 1;
1149 delay(2);
1150 }
1151 }
1152 } while (repeat);
1153 }
1154 }
1155
1156 /*
1157 * returns 1 if the the throttle should throttle, 0 if not.
1158 */
1159 int
_rdc_diskq_isfull(disk_queue * q,long len)1160 _rdc_diskq_isfull(disk_queue *q, long len)
1161 {
1162 /* ---T----H----N--- */
1163 mutex_enter(QLOCK(q));
1164
1165 if (FITSONQ(q, len + 1)) {
1166 mutex_exit(QLOCK(q));
1167 return (0);
1168 }
1169 mutex_exit(QLOCK(q));
1170 return (1);
1171 }
1172
1173 void
_rdc_async_throttle(rdc_k_info_t * this,long len)1174 _rdc_async_throttle(rdc_k_info_t *this, long len)
1175 {
1176 rdc_k_info_t *krdc;
1177 rdc_u_info_t *urdc;
1178 int print_msg = 1;
1179 int tries = RDC_FUTILE_ATTEMPTS;
1180
1181 /*
1182 * Throttle entries on queue
1183 */
1184
1185 /* Need to take the 1-many case into account, checking all sets */
1186
1187 /* ADD HANDY HUERISTIC HERE TO SLOW DOWN IO */
1188 for (krdc = this; /* CSTYLED */; krdc = krdc->many_next) {
1189 urdc = &rdc_u_info[krdc->index];
1190
1191 /*
1192 * this may be the last set standing in a one to many setup.
1193 * we may also be stuck in unintercept, after marking
1194 * the volume as not enabled, but have not removed it
1195 * from the many list resulting in an endless loop if
1196 * we just continue here. Lets jump over this stuff
1197 * and check to see if we are the only dude here.
1198 */
1199 if (!IS_ENABLED(urdc))
1200 goto thischeck;
1201
1202 if (IS_ASYNC(urdc) && RDC_IS_MEMQ(krdc->group)) {
1203 net_queue *q = &krdc->group->ra_queue;
1204 while ((q->blocks + q->inflbls) > urdc->maxqfbas ||
1205 (q->nitems + q->inflitems) > urdc->maxqitems) {
1206
1207 if (!IS_ENABLED(urdc)) /* disable race */
1208 goto thischeck;
1209
1210 if (!krdc->group->rdc_writer)
1211 (void) rdc_writer(krdc->index);
1212 delay(2);
1213 q->throttle_delay++;
1214 }
1215 }
1216
1217 /* do a much more aggressive delay, get disk flush going */
1218 if (IS_ASYNC(urdc) && RDC_IS_DISKQ(krdc->group)) {
1219 disk_queue *q = &krdc->group->diskq;
1220 while ((!IS_QSTATE(q, RDC_QNOBLOCK)) &&
1221 (_rdc_diskq_isfull(q, len)) &&
1222 (!IS_STATE(urdc, RDC_DISKQ_FAILED))) {
1223 if (print_msg) {
1224 cmn_err(CE_WARN, "!rdc async throttle:"
1225 " disk queue %s full",
1226 &urdc->disk_queue[0]);
1227
1228 print_msg = 0;
1229 }
1230 if (!IS_ENABLED(urdc)) /* disable race */
1231 goto thischeck;
1232
1233 if (!krdc->group->rdc_writer)
1234 (void) rdc_writer(krdc->index);
1235 delay(10);
1236 q->throttle_delay += 10;
1237
1238 if (!(tries--) && IS_STATE(urdc, RDC_QUEUING)) {
1239 cmn_err(CE_WARN, "!SNDR: disk queue "
1240 "%s full & not flushing. giving up",
1241 &urdc->disk_queue[0]);
1242 cmn_err(CE_WARN, "!SNDR: %s:%s entering"
1243 " logging mode",
1244 urdc->secondary.intf,
1245 urdc->secondary.file);
1246 rdc_fail_diskq(krdc, RDC_WAIT,
1247 RDC_DOLOG | RDC_NOFAIL);
1248 mutex_enter(QLOCK(q));
1249 cv_broadcast(&q->qfullcv);
1250 mutex_exit(QLOCK(q));
1251 }
1252
1253 }
1254 if ((IS_QSTATE(q, RDC_QNOBLOCK)) &&
1255 _rdc_diskq_isfull(q, len) &&
1256 !IS_STATE(urdc, RDC_DISKQ_FAILED)) {
1257 if (print_msg) {
1258 cmn_err(CE_WARN, "!disk queue %s full",
1259 &urdc->disk_queue[0]);
1260 print_msg = 0;
1261 }
1262 rdc_fail_diskq(krdc, RDC_WAIT,
1263 RDC_DOLOG | RDC_NOFAIL);
1264 mutex_enter(QLOCK(q));
1265 cv_broadcast(&q->qfullcv);
1266 mutex_exit(QLOCK(q));
1267 }
1268 }
1269
1270 thischeck:
1271 if (krdc->many_next == this)
1272 break;
1273 }
1274 }
1275
1276 int rdc_coalesce = 1;
1277 static int rdc_joins = 0;
1278
1279 int
rdc_aio_coalesce(rdc_aio_t * queued,rdc_aio_t * new)1280 rdc_aio_coalesce(rdc_aio_t *queued, rdc_aio_t *new)
1281 {
1282 nsc_buf_t *h = NULL;
1283 int rc;
1284 rdc_k_info_t *krdc;
1285 uint_t bitmask;
1286
1287 if (rdc_coalesce == 0)
1288 return (0); /* don't even try */
1289
1290 if ((queued == NULL) ||
1291 (queued->handle == NULL) ||
1292 (new->handle == NULL)) {
1293 return (0); /* existing queue is empty */
1294 }
1295 if (queued->index != new->index || queued->len + new->len >
1296 MAX_RDC_FBAS) {
1297 return (0); /* I/O to big */
1298 }
1299 if ((queued->pos + queued->len == new->pos) ||
1300 (new->pos + new->len == queued->pos)) {
1301 rc = nsc_alloc_abuf(queued->pos, queued->len + new->len, 0,
1302 &h);
1303 if (!RDC_SUCCESS(rc)) {
1304 if (h != NULL)
1305 (void) nsc_free_buf(h);
1306 return (0); /* couldn't do coalesce */
1307 }
1308 rc = nsc_copy(queued->handle, h, queued->pos, queued->pos,
1309 queued->len);
1310 if (!RDC_SUCCESS(rc)) {
1311 (void) nsc_free_buf(h);
1312 return (0); /* couldn't do coalesce */
1313 }
1314 rc = nsc_copy(new->handle, h, new->pos, new->pos,
1315 new->len);
1316 if (!RDC_SUCCESS(rc)) {
1317 (void) nsc_free_buf(h);
1318 return (0); /* couldn't do coalesce */
1319 }
1320
1321 krdc = &rdc_k_info[queued->index];
1322
1323 RDC_SET_BITMASK(queued->pos, queued->len, &bitmask);
1324 RDC_CLR_BITMAP(krdc, queued->pos, queued->len, \
1325 bitmask, RDC_BIT_BUMP);
1326
1327 RDC_SET_BITMASK(new->pos, new->len, &bitmask);
1328 RDC_CLR_BITMAP(krdc, new->pos, new->len, \
1329 bitmask, RDC_BIT_BUMP);
1330
1331 (void) nsc_free_buf(queued->handle);
1332 (void) nsc_free_buf(new->handle);
1333 queued->handle = h;
1334 queued->len += new->len;
1335 bitmask = 0;
1336 /*
1337 * bump the ref count back up
1338 */
1339
1340 RDC_SET_BITMAP(krdc, queued->pos, queued->len, &bitmask);
1341 return (1); /* new I/O succeeds last I/O queued */
1342 }
1343 return (0);
1344 }
1345
1346 int
rdc_memq_enqueue(rdc_k_info_t * krdc,rdc_aio_t * aio)1347 rdc_memq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1348 {
1349 net_queue *q;
1350 rdc_group_t *group;
1351
1352 group = krdc->group;
1353 q = &group->ra_queue;
1354
1355 mutex_enter(&q->net_qlock);
1356
1357 if (rdc_aio_coalesce(q->net_qtail, aio)) {
1358 rdc_joins++;
1359 q->blocks += aio->len;
1360 kmem_free(aio, sizeof (*aio));
1361 goto out;
1362 }
1363 aio->seq = group->seq++;
1364 if (group->seq < aio->seq)
1365 group->seq = RDC_NEWSEQ + 1; /* skip magics */
1366
1367 if (q->net_qhead == NULL) {
1368 /* adding to empty q */
1369 q->net_qhead = q->net_qtail = aio;
1370
1371 #ifdef DEBUG
1372 if (q->blocks != 0 || q->nitems != 0) {
1373 cmn_err(CE_PANIC,
1374 "rdc enqueue: q %p, qhead 0, q blocks %" NSC_SZFMT
1375 ", nitems %" NSC_SZFMT,
1376 (void *) q, q->blocks, q->nitems);
1377 }
1378 #endif
1379
1380 } else {
1381 /* discontiguous, add aio to q tail */
1382 q->net_qtail->next = aio;
1383 q->net_qtail = aio;
1384 }
1385
1386 q->blocks += aio->len;
1387 q->nitems++;
1388
1389 if (krdc->io_kstats) {
1390 mutex_enter(krdc->io_kstats->ks_lock);
1391 kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1392 mutex_exit(krdc->io_kstats->ks_lock);
1393 }
1394 out:
1395 #ifdef DEBUG
1396 /* sum the q and check for sanity */
1397 {
1398 nsc_size_t qblocks = 0;
1399 uint64_t nitems = 0;
1400 rdc_aio_t *a;
1401
1402 for (a = q->net_qhead; a != NULL; a = a->next) {
1403 qblocks += a->len;
1404 nitems++;
1405 }
1406
1407 if (qblocks != q->blocks || nitems != q->nitems) {
1408 cmn_err(CE_PANIC,
1409 "rdc enqueue: q %p, q blocks %" NSC_SZFMT " (%"
1410 NSC_SZFMT "), nitems %" NSC_SZFMT " (%"
1411 NSC_SZFMT ")", (void *) q, q->blocks, qblocks,
1412 q->nitems, nitems);
1413 }
1414 }
1415 #endif
1416
1417 mutex_exit(&q->net_qlock);
1418
1419 if (q->nitems > q->nitems_hwm) {
1420 q->nitems_hwm = q->nitems;
1421 }
1422
1423 if (q->blocks > q->blocks_hwm) {
1424 q->blocks_hwm = q->blocks;
1425 }
1426
1427 if (!krdc->group->rdc_writer)
1428 (void) rdc_writer(krdc->index);
1429
1430 return (0);
1431 }
1432
1433 int
_rdc_enqueue_write(rdc_k_info_t * krdc,nsc_off_t pos,nsc_size_t len,int flag,nsc_buf_t * h)1434 _rdc_enqueue_write(rdc_k_info_t *krdc, nsc_off_t pos, nsc_size_t len, int flag,
1435 nsc_buf_t *h)
1436 {
1437 rdc_aio_t *aio;
1438 rdc_group_t *group;
1439 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1440 int rc;
1441
1442 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1443 if (!aio) {
1444 return (ENOMEM);
1445 }
1446
1447 group = krdc->group;
1448
1449 aio->pos = pos;
1450 aio->qpos = -1;
1451 aio->len = len;
1452 aio->flag = flag;
1453 aio->index = krdc->index;
1454 aio->handle = h;
1455
1456 if (group->flags & RDC_MEMQUE) {
1457 return (rdc_memq_enqueue(krdc, aio));
1458 } else if ((group->flags & RDC_DISKQUE) &&
1459 !IS_STATE(urdc, RDC_DISKQ_FAILED)) {
1460 rc = rdc_diskq_enqueue(krdc, aio);
1461 kmem_free(aio, sizeof (*aio));
1462 return (rc);
1463 }
1464 return (-1); /* keep lint quiet */
1465 }
1466
1467
1468
1469
1470 /*
1471 * Async Network RDC flusher
1472 */
1473
1474 /*
1475 * don't allow any new writer threads to start if a member of the set
1476 * is disable pending
1477 */
1478 int
is_disable_pending(rdc_k_info_t * krdc)1479 is_disable_pending(rdc_k_info_t *krdc)
1480 {
1481 rdc_k_info_t *this = krdc;
1482 int rc = 0;
1483
1484 do {
1485 if (krdc->type_flag & RDC_DISABLEPEND) {
1486 krdc = this;
1487 rc = 1;
1488 break;
1489 }
1490 krdc = krdc->group_next;
1491
1492 } while (krdc != this);
1493
1494 return (rc);
1495 }
1496
1497 /*
1498 * rdc_writer -- spawn new writer if not running already
1499 * called after enqueing the dirty blocks
1500 */
1501 int
rdc_writer(int index)1502 rdc_writer(int index)
1503 {
1504 rdc_k_info_t *krdc = &rdc_k_info[index];
1505 nsthread_t *t;
1506 rdc_group_t *group;
1507 kmutex_t *qlock;
1508 int tries;
1509 const int MAX_TRIES = 16;
1510
1511 group = krdc->group;
1512
1513 if (RDC_IS_DISKQ(group))
1514 qlock = &group->diskq.disk_qlock;
1515 else
1516 qlock = &group->ra_queue.net_qlock;
1517
1518 mutex_enter(qlock);
1519
1520 #ifdef DEBUG
1521 if (noflush) {
1522 mutex_exit(qlock);
1523 return (0);
1524 }
1525 #endif
1526
1527 if ((group->rdc_writer) || is_disable_pending(krdc)) {
1528 mutex_exit(qlock);
1529 return (0);
1530 }
1531
1532 if ((group->rdc_thrnum >= 1) && (group->seqack == RDC_NEWSEQ)) {
1533 /*
1534 * We also need to check if we are starting a new
1535 * sequence, and if so don't create a new thread,
1536 * as we must ensure that the start of new sequence
1537 * requests arrives first to re-init the server.
1538 */
1539 mutex_exit(qlock);
1540 return (0);
1541 }
1542 /*
1543 * For version 6,
1544 * see if we can fit in another thread.
1545 */
1546 group->rdc_thrnum++;
1547
1548 if (krdc->intf && (krdc->intf->rpc_version >= RDC_VERSION6)) {
1549 rdc_u_info_t *urdc = &rdc_u_info[index];
1550 if (group->rdc_thrnum >= urdc->asyncthr)
1551 group->rdc_writer = 1;
1552 } else {
1553 group->rdc_writer = 1;
1554 }
1555
1556 mutex_exit(qlock);
1557
1558
1559 /*
1560 * If we got here, we know that we have not exceeded the allowed
1561 * number of async threads for our group. If we run out of threads
1562 * in _rdc_flset, we add a new thread to the set.
1563 */
1564 tries = 0;
1565 do {
1566 /* first try to grab a thread from the free list */
1567 if (t = nst_create(_rdc_flset, rdc_flusher_thread,
1568 (blind_t)(unsigned long)index, 0)) {
1569 break;
1570 }
1571
1572 /* that failed; add a thread to the set and try again */
1573 if (nst_add_thread(_rdc_flset, 1) != 1) {
1574 cmn_err(CE_WARN, "!rdc_writer index %d nst_add_thread "
1575 "error, tries: %d", index, tries);
1576 break;
1577 }
1578 } while (++tries < MAX_TRIES);
1579
1580 if (tries) {
1581 mutex_enter(&group->addthrnumlk);
1582 group->rdc_addthrnum += tries;
1583 mutex_exit(&group->addthrnumlk);
1584 }
1585
1586 if (t) {
1587 return (1);
1588 }
1589
1590 cmn_err(CE_WARN, "!rdc_writer: index %d nst_create error", index);
1591 rdc_many_enter(krdc);
1592 mutex_enter(qlock);
1593 group->rdc_thrnum--;
1594 group->rdc_writer = 0;
1595 if ((group->count == 0) && (group->rdc_thrnum == 0)) {
1596 mutex_exit(qlock);
1597 /*
1598 * Race with remove_from_group while write thread was
1599 * failing to be created.
1600 */
1601 #ifdef DEBUG
1602 cmn_err(CE_WARN, "!rdc_writer: group being destroyed");
1603 #endif
1604 rdc_delgroup(group);
1605 krdc->group = NULL;
1606 rdc_many_exit(krdc);
1607 return (-1);
1608 }
1609 mutex_exit(qlock);
1610 rdc_many_exit(krdc);
1611 return (-1);
1612 }
1613
1614 /*
1615 * Either we need to flush the
1616 * kmem (net_queue) queue or the disk (disk_queue)
1617 * determine which, and do it.
1618 */
1619 void
rdc_flusher_thread(int index)1620 rdc_flusher_thread(int index)
1621 {
1622 rdc_k_info_t *krdc = &rdc_k_info[index];
1623
1624 if (krdc->group->flags & RDC_MEMQUE) {
1625 rdc_flush_memq(index);
1626 return;
1627 } else if (krdc->group->flags & RDC_DISKQUE) {
1628 rdc_flush_diskq(index);
1629 return;
1630 } else { /* uh-oh, big time */
1631 cmn_err(CE_PANIC, "flusher trying to flush unknown queue type");
1632 }
1633
1634 }
1635
1636 void
rdc_flush_memq(int index)1637 rdc_flush_memq(int index)
1638 {
1639 rdc_k_info_t *krdc = &rdc_k_info[index];
1640 rdc_aio_t *aio;
1641 net_queue *q;
1642 int dowork;
1643 rdc_group_t *group = krdc->group;
1644 if (!group || group->count == 0) {
1645 #ifdef DEBUG
1646 cmn_err(CE_WARN, "!rdc_flush_memq: no group left!");
1647 #endif
1648 return;
1649 }
1650
1651 if (!krdc->c_fd) {
1652 #ifdef DEBUG
1653 cmn_err(CE_WARN, "!rdc_flush_memq: no c_fd!");
1654 #endif
1655 goto thread_death;
1656 }
1657
1658 #ifdef DEBUG_DISABLE
1659 if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1660 cmn_err(CE_WARN, "!rdc_flush_memq: DISABLE PENDING!");
1661 /*
1662 * Need to continue as we may be trying to flush IO
1663 * while trying to disable or suspend
1664 */
1665 }
1666 #endif
1667
1668 q = &group->ra_queue;
1669
1670 dowork = 1;
1671 /* CONSTCOND */
1672 while (dowork) {
1673 if (net_exit == ATM_EXIT)
1674 break;
1675
1676 group = krdc->group;
1677 if (!group || group->count == 0) {
1678 #ifdef DEBUG
1679 cmn_err(CE_WARN, "!rdc_flush_memq: no group left!");
1680 #endif
1681 break;
1682 }
1683
1684 mutex_enter(&q->net_qlock);
1685 aio = q->net_qhead;
1686
1687 if (aio == NULL) {
1688 #ifdef DEBUG
1689 if (q->nitems != 0 ||
1690 q->blocks != 0 ||
1691 q->net_qtail != 0) {
1692 cmn_err(CE_PANIC,
1693 "rdc_flush_memq(1): q %p, q blocks %"
1694 NSC_SZFMT ", nitems %" NSC_SZFMT
1695 ", qhead %p qtail %p",
1696 (void *) q, q->blocks, q->nitems,
1697 (void *) aio, (void *) q->net_qtail);
1698 }
1699 #endif
1700 mutex_exit(&q->net_qlock);
1701 break;
1702 }
1703
1704 /* aio remove from q */
1705
1706 q->net_qhead = aio->next;
1707 aio->next = NULL;
1708
1709 if (q->net_qtail == aio)
1710 q->net_qtail = q->net_qhead;
1711
1712 q->blocks -= aio->len;
1713 q->nitems--;
1714
1715 /*
1716 * in flight numbers.
1717 */
1718 q->inflbls += aio->len;
1719 q->inflitems++;
1720
1721 #ifdef DEBUG
1722 if (q->net_qhead == NULL) {
1723 if (q->nitems != 0 ||
1724 q->blocks != 0 ||
1725 q->net_qtail != 0) {
1726 cmn_err(CE_PANIC,
1727 "rdc_flush_memq(2): q %p, q blocks %"
1728 NSC_SZFMT ", nitems %" NSC_SZFMT
1729 ", qhead %p qtail %p",
1730 (void *) q, q->blocks, q->nitems,
1731 (void *) q->net_qhead,
1732 (void *) q->net_qtail);
1733 }
1734 }
1735
1736 #ifndef NSC_MULTI_TERABYTE
1737 if (q->blocks < 0) {
1738 cmn_err(CE_PANIC,
1739 "rdc_flush_memq(3): q %p, q blocks %" NSC_SZFMT
1740 ", nitems %d, qhead %p, qtail %p",
1741 (void *) q, q->blocks, q->nitems,
1742 (void *) q->net_qhead, (void *) q->net_qtail);
1743 }
1744 #else
1745 /* blocks and nitems are unsigned for NSC_MULTI_TERABYTE */
1746 #endif
1747 #endif
1748
1749 mutex_exit(&q->net_qlock);
1750
1751 aio->iostatus = RDC_IO_INIT;
1752
1753 _rdc_remote_flush(aio);
1754
1755 mutex_enter(&q->net_qlock);
1756 q->inflbls -= aio->len;
1757 q->inflitems--;
1758 if ((group->seqack == RDC_NEWSEQ) &&
1759 (group->seq != RDC_NEWSEQ + 1)) {
1760 if ((q->net_qhead == NULL) ||
1761 (q->net_qhead->seq != RDC_NEWSEQ + 1)) {
1762 /*
1763 * We are an old thread, and the
1764 * queue sequence has been reset
1765 * during the network write above.
1766 * As such we mustn't pull another
1767 * job from the queue until the
1768 * first sequence message has been ack'ed.
1769 * Just die instead. Unless this thread
1770 * is the first sequence that has just
1771 * been ack'ed
1772 */
1773 dowork = 0;
1774 }
1775 }
1776 mutex_exit(&q->net_qlock);
1777
1778 if ((aio->iostatus != RDC_IO_DONE) && (group->count)) {
1779 rdc_k_info_t *krdctmp = &rdc_k_info[aio->index];
1780 if (krdctmp->type_flag & RDC_DISABLEPEND) {
1781 kmem_free(aio, sizeof (*aio));
1782 goto thread_death;
1783 }
1784 rdc_group_enter(krdc);
1785 ASSERT(krdc->group);
1786 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
1787 "memq flush aio status not RDC_IO_DONE");
1788 rdc_group_exit(krdc);
1789 rdc_dump_queue(aio->index);
1790 }
1791 kmem_free(aio, sizeof (*aio));
1792
1793 if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf)
1794 break;
1795 }
1796
1797 thread_death:
1798 rdc_many_enter(krdc);
1799 mutex_enter(&group->ra_queue.net_qlock);
1800 group->rdc_thrnum--;
1801 group->rdc_writer = 0;
1802 /*
1803 * all threads must be dead.
1804 */
1805 if ((group->count == 0) && (group->rdc_thrnum == 0)) {
1806 mutex_exit(&group->ra_queue.net_qlock);
1807 /*
1808 * Group now empty, so destroy
1809 * Race with remove_from_group while write thread was running
1810 */
1811 #ifdef DEBUG
1812 cmn_err(CE_WARN, "!rdc_flush_memq: group being destroyed");
1813 #endif
1814 rdc_delgroup(group);
1815 krdc->group = NULL;
1816 rdc_many_exit(krdc);
1817 return;
1818 }
1819 mutex_exit(&group->ra_queue.net_qlock);
1820 rdc_many_exit(krdc);
1821 }
1822
1823 /*
1824 * rdc_flush_diskq
1825 * disk queue flusher
1826 */
1827 void
rdc_flush_diskq(int index)1828 rdc_flush_diskq(int index)
1829 {
1830 rdc_k_info_t *krdc = &rdc_k_info[index];
1831 rdc_u_info_t *urdc = &rdc_u_info[index];
1832 rdc_aio_t *aio = NULL;
1833 disk_queue *q;
1834 net_queue *nq;
1835 int dowork;
1836 int rc;
1837 rdc_group_t *group = krdc->group;
1838
1839 if (!group || group->count == 0) {
1840 #ifdef DEBUG
1841 cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!");
1842 #endif
1843 return;
1844 }
1845
1846 if (!krdc->c_fd) {
1847 #ifdef DEBUG
1848 cmn_err(CE_WARN, "!rdc_flush_diskq: no c_fd!");
1849 #endif
1850 return;
1851 }
1852
1853 #ifdef DEBUG_DISABLE
1854 if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1855 cmn_err(CE_WARN, "!rdc_flush_diskq: DISABLE PENDING!");
1856 /*
1857 * Need to continue as we may be trying to flush IO
1858 * while trying to disable or suspend
1859 */
1860 }
1861 #endif
1862 q = &group->diskq;
1863 nq = &group->ra_queue;
1864
1865 if (IS_QSTATE(q, RDC_QDISABLEPEND) || IS_STATE(urdc, RDC_LOGGING)) {
1866 #ifdef DEBUG
1867 cmn_err(CE_NOTE, "!flusher thread death 1 %x", QSTATE(q));
1868 #endif
1869 goto thread_death;
1870 }
1871
1872 dowork = 1;
1873 /* CONSTCOND */
1874 while (dowork) {
1875 if (net_exit == ATM_EXIT)
1876 break;
1877
1878 group = krdc->group;
1879 if (!group || group->count == 0) {
1880 #ifdef DEBUG
1881 cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!");
1882 #endif
1883 break;
1884 }
1885
1886 do {
1887 rc = 0;
1888 if ((IS_STATE(urdc, RDC_LOGGING)) ||
1889 (IS_STATE(urdc, RDC_SYNCING)) ||
1890 (nq->qfflags & RDC_QFILLSLEEP))
1891 goto thread_death;
1892
1893 aio = rdc_dequeue(krdc, &rc);
1894
1895 if ((IS_STATE(urdc, RDC_LOGGING)) ||
1896 (IS_STATE(urdc, RDC_SYNCING)) ||
1897 (nq->qfflags & RDC_QFILLSLEEP)) {
1898 goto thread_death;
1899 }
1900 if (rc == EAGAIN) {
1901 delay(40);
1902 }
1903
1904 } while (rc == EAGAIN);
1905
1906 if (aio == NULL) {
1907 break;
1908 }
1909
1910 aio->iostatus = RDC_IO_INIT;
1911
1912 mutex_enter(QLOCK(q));
1913 q->inflbls += aio->len;
1914 q->inflitems++;
1915 mutex_exit(QLOCK(q));
1916
1917 _rdc_remote_flush(aio);
1918
1919 mutex_enter(QLOCK(q));
1920 q->inflbls -= aio->len;
1921 q->inflitems--;
1922
1923 if ((group->seqack == RDC_NEWSEQ) &&
1924 (group->seq != RDC_NEWSEQ + 1)) {
1925 if ((nq->net_qhead == NULL) ||
1926 (nq->net_qhead->seq != RDC_NEWSEQ + 1)) {
1927 /*
1928 * We are an old thread, and the
1929 * queue sequence has been reset
1930 * during the network write above.
1931 * As such we mustn't pull another
1932 * job from the queue until the
1933 * first sequence message has been ack'ed.
1934 * Just die instead. Unless of course,
1935 * this thread is the first sequence that
1936 * has just been ack'ed.
1937 */
1938 dowork = 0;
1939 }
1940 }
1941 mutex_exit(QLOCK(q));
1942
1943 if (aio->iostatus == RDC_IO_CANCELLED) {
1944 rdc_dump_queue(aio->index);
1945 kmem_free(aio, sizeof (*aio));
1946 aio = NULL;
1947 if (group) { /* seq gets bumped on dequeue */
1948 mutex_enter(QLOCK(q));
1949 rdc_dump_iohdrs(q);
1950 SET_QNXTIO(q, QHEAD(q));
1951 SET_QCOALBOUNDS(q, QHEAD(q));
1952 group->seq = RDC_NEWSEQ;
1953 group->seqack = RDC_NEWSEQ;
1954 mutex_exit(QLOCK(q));
1955 }
1956 break;
1957 }
1958
1959 if ((aio->iostatus != RDC_IO_DONE) && (group->count)) {
1960 rdc_k_info_t *krdctmp = &rdc_k_info[aio->index];
1961 if (krdctmp->type_flag & RDC_DISABLEPEND) {
1962 kmem_free(aio, sizeof (*aio));
1963 aio = NULL;
1964 goto thread_death;
1965 }
1966 rdc_group_enter(krdc);
1967 rdc_group_log(krdc,
1968 RDC_NOFLUSH | RDC_ALLREMOTE | RDC_QUEUING,
1969 "diskq flush aio status not RDC_IO_DONE");
1970 rdc_group_exit(krdc);
1971 rdc_dump_queue(aio->index);
1972 }
1973
1974 kmem_free(aio, sizeof (*aio));
1975 aio = NULL;
1976
1977 #ifdef DEBUG_DISABLE
1978 if (krdc->type_flag & RDC_DISABLEPEND) {
1979 cmn_err(CE_WARN,
1980 "!rdc_flush_diskq: DISABLE PENDING after IO!");
1981 }
1982 #endif
1983 if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf)
1984 break;
1985
1986 if (IS_QSTATE(q, RDC_QDISABLEPEND)) {
1987 #ifdef DEBUG
1988 cmn_err(CE_NOTE, "!flusher thread death 2");
1989 #endif
1990 break;
1991 }
1992 }
1993 thread_death:
1994 rdc_many_enter(krdc);
1995 mutex_enter(QLOCK(q));
1996 group->rdc_thrnum--;
1997 group->rdc_writer = 0;
1998
1999 if (aio && aio->qhandle) {
2000 aio->qhandle->sb_user--;
2001 if (aio->qhandle->sb_user == 0) {
2002 (void) _rdc_rsrv_diskq(krdc->group);
2003 rdc_fixlen(aio);
2004 (void) nsc_free_buf(aio->qhandle);
2005 aio->qhandle = NULL;
2006 aio->handle = NULL;
2007 _rdc_rlse_diskq(krdc->group);
2008 }
2009 }
2010 if ((group->count == 0) && (group->rdc_thrnum == 0)) {
2011 mutex_exit(QLOCK(q));
2012 /*
2013 * Group now empty, so destroy
2014 * Race with remove_from_group while write thread was running
2015 */
2016 #ifdef DEBUG
2017 cmn_err(CE_WARN, "!rdc_flush_diskq: group being destroyed");
2018 #endif
2019 mutex_enter(&group->diskqmutex);
2020 rdc_close_diskq(group);
2021 mutex_exit(&group->diskqmutex);
2022 rdc_delgroup(group);
2023 krdc->group = NULL;
2024 rdc_many_exit(krdc);
2025 return;
2026 }
2027 mutex_exit(QLOCK(q));
2028 rdc_many_exit(krdc);
2029 }
2030
2031 /*
2032 * _rdc_remote_flush
2033 * Flush a single block ANON block
2034 * this function will flush from either the disk queue
2035 * or the memory queue. The appropriate locks must be
2036 * taken out etc, etc ...
2037 */
2038 static void
_rdc_remote_flush(rdc_aio_t * aio)2039 _rdc_remote_flush(rdc_aio_t *aio)
2040 {
2041 rdc_k_info_t *krdc = &rdc_k_info[aio->index];
2042 rdc_u_info_t *urdc = &rdc_u_info[aio->index];
2043 disk_queue *q = &krdc->group->diskq;
2044 kmutex_t *qlock;
2045 rdc_group_t *group;
2046 nsc_buf_t *h = NULL;
2047 int reserved = 0;
2048 int rtype = RDC_RAW;
2049 int rc;
2050 uint_t maxseq;
2051 struct netwriteres netret;
2052 int waitq = 1;
2053 int vflags;
2054
2055 group = krdc->group;
2056 netret.vecdata.vecdata_val = NULL;
2057 netret.vecdata.vecdata_len = 0;
2058
2059 /* Where did we get this aio from anyway? */
2060 if (RDC_IS_DISKQ(group)) {
2061 qlock = &group->diskq.disk_qlock;
2062 } else {
2063 qlock = &group->ra_queue.net_qlock;
2064 }
2065
2066 /*
2067 * quench transmission if we are too far ahead of the
2068 * server Q, or it will overflow.
2069 * Must fail all requests while asyncdis is set.
2070 * It will be cleared when the last thread to be discarded
2071 * sets the asyncstall counter to zero.
2072 * Note the thread within rdc_net_write
2073 * also bumps the asyncstall counter.
2074 */
2075
2076 mutex_enter(qlock);
2077 if (group->asyncdis) {
2078 aio->iostatus = RDC_IO_CANCELLED;
2079 mutex_exit(qlock);
2080 goto failed;
2081 }
2082 /* don't go to sleep if we have gone logging! */
2083 vflags = rdc_get_vflags(urdc);
2084 if ((vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2085 if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group))
2086 aio->iostatus = RDC_IO_CANCELLED;
2087
2088 mutex_exit(qlock);
2089 goto failed;
2090 }
2091
2092 while (maxseq = group->seqack + RDC_MAXPENDQ + 1,
2093 maxseq = (maxseq < group->seqack) ? maxseq + RDC_NEWSEQ + 1
2094 : maxseq, !RDC_INFRONT(aio->seq, maxseq)) {
2095 group->asyncstall++;
2096 ASSERT(!IS_STATE(urdc, RDC_LOGGING));
2097 cv_wait(&group->asyncqcv, qlock);
2098 group->asyncstall--;
2099 ASSERT(group->asyncstall >= 0);
2100 if (group->asyncdis) {
2101 if (group->asyncstall == 0) {
2102 group->asyncdis = 0;
2103 }
2104 aio->iostatus = RDC_IO_CANCELLED;
2105 mutex_exit(qlock);
2106 goto failed;
2107 }
2108 /*
2109 * See if we have gone into logging mode
2110 * since sleeping.
2111 */
2112 vflags = rdc_get_vflags(urdc);
2113 if (vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING)) {
2114 if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group))
2115 aio->iostatus = RDC_IO_CANCELLED;
2116
2117 mutex_exit(qlock);
2118 goto failed;
2119 }
2120 }
2121 mutex_exit(qlock);
2122
2123 if ((krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) {
2124 mutex_enter(krdc->io_kstats->ks_lock);
2125 kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2126 mutex_exit(krdc->io_kstats->ks_lock);
2127 waitq = 0;
2128 }
2129
2130
2131 rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
2132 if (rc != 0) {
2133 #ifdef DEBUG
2134 cmn_err(CE_WARN, "!_rdc_remote_flush: reserve, index %d, rc %d",
2135 aio->index, rc);
2136 #endif
2137 goto failed;
2138 }
2139
2140 reserved = 1;
2141 /*
2142 * Case where we are multihop and calling with no ANON bufs
2143 * Need to do the read to fill the buf.
2144 */
2145 if (!aio->handle) {
2146 rc = nsc_alloc_buf(RDC_U_FD(krdc), aio->pos, aio->len,
2147 (aio->flag & ~NSC_WRITE) | NSC_READ, &h);
2148 if (!RDC_SUCCESS(rc)) {
2149 #ifdef DEBUG
2150 cmn_err(CE_WARN,
2151 "!_rdc_remote_flush: alloc_buf, index %d, pos %"
2152 NSC_SZFMT ", len %" NSC_SZFMT ", rc %d",
2153 aio->index, aio->pos, aio->len, rc);
2154 #endif
2155
2156 goto failed;
2157 }
2158 aio->handle = h;
2159 aio->handle->sb_user = RDC_NULLBUFREAD;
2160 }
2161
2162 mutex_enter(qlock);
2163 if (group->asyncdis) {
2164 if (group->asyncstall == 0) {
2165 group->asyncdis = 0;
2166 }
2167 aio->iostatus = RDC_IO_CANCELLED;
2168 mutex_exit(qlock);
2169 goto failed;
2170 }
2171 group->asyncstall++;
2172 mutex_exit(qlock);
2173
2174
2175 if (krdc->remote_index < 0) {
2176 /*
2177 * this should be ok, we are flushing, not rev syncing.
2178 * remote_index could be -1 if we lost a race with
2179 * resume and the flusher trys to flush an io from
2180 * another set that has not resumed
2181 */
2182 krdc->remote_index = rdc_net_state(krdc->index, CCIO_SLAVE);
2183 DTRACE_PROBE1(remote_index_negative, int, krdc->remote_index);
2184
2185 }
2186
2187 /*
2188 * double check for logging, no check in net_write()
2189 * skip the write if you can, otherwise, if logging
2190 * avoid clearing the bit .. you don't know whose bit it may
2191 * also be.
2192 */
2193 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2194 aio->iostatus = RDC_IO_CANCELLED;
2195 mutex_enter(qlock);
2196 group->asyncstall--;
2197 mutex_exit(qlock);
2198 goto failed;
2199 }
2200
2201 rc = rdc_net_write(krdc->index, krdc->remote_index,
2202 aio->handle, aio->pos, aio->len, aio->seq, aio->qpos, &netret);
2203
2204 mutex_enter(qlock);
2205 group->asyncstall--;
2206 if (group->asyncdis) {
2207 if (group->asyncstall == 0) {
2208 group->asyncdis = 0;
2209 }
2210 aio->iostatus = RDC_IO_CANCELLED;
2211 mutex_exit(qlock);
2212 goto failed;
2213 }
2214
2215 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2216 mutex_exit(qlock);
2217 aio->iostatus = RDC_IO_CANCELLED;
2218 goto failed;
2219 }
2220
2221 ASSERT(aio->handle);
2222 if (rc != 0) {
2223 #ifdef DEBUG
2224 cmn_err(CE_WARN,
2225 "!_rdc_remote_flush: write, index %d, pos %" NSC_SZFMT
2226 ", len %" NSC_SZFMT ", "
2227 "rc %d seq %u group seq %u seqack %u qpos %" NSC_SZFMT,
2228 aio->index, aio->pos, aio->len, rc, aio->seq,
2229 group->seq, group->seqack, aio->qpos);
2230 #endif
2231 if (rc == ENOLINK) {
2232 cmn_err(CE_WARN,
2233 "!Hard timeout detected (%d sec) "
2234 "on SNDR set %s:%s",
2235 rdc_rpc_tmout, urdc->secondary.intf,
2236 urdc->secondary.file);
2237 }
2238 mutex_exit(qlock);
2239 goto failed;
2240 } else {
2241 aio->iostatus = RDC_IO_DONE;
2242 }
2243
2244 if (RDC_IS_DISKQ(group)) {
2245 /* free locally alloc'd handle */
2246 if (aio->handle->sb_user == RDC_NULLBUFREAD) {
2247 (void) nsc_free_buf(aio->handle);
2248 aio->handle = NULL;
2249 }
2250 aio->qhandle->sb_user--;
2251 if (aio->qhandle->sb_user == 0) {
2252 (void) _rdc_rsrv_diskq(group);
2253 rdc_fixlen(aio);
2254 (void) nsc_free_buf(aio->qhandle);
2255 aio->qhandle = NULL;
2256 aio->handle = NULL;
2257 _rdc_rlse_diskq(group);
2258 }
2259
2260 } else {
2261 (void) nsc_free_buf(aio->handle);
2262 aio->handle = NULL;
2263 }
2264
2265 mutex_exit(qlock);
2266
2267 _rdc_rlse_devs(krdc, rtype);
2268
2269 if (netret.result == 0) {
2270 vflags = rdc_get_vflags(urdc);
2271
2272 if (!(vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2273 RDC_CLR_BITMAP(krdc, aio->pos, aio->len, \
2274 0xffffffff, RDC_BIT_BUMP);
2275
2276 if (RDC_IS_DISKQ(krdc->group)) {
2277 if (!IS_STATE(urdc, RDC_LOGGING)) {
2278 /* tell queue data has been flushed */
2279 rdc_clr_iohdr(krdc, aio->qpos);
2280 } else { /* throw away queue, logging */
2281 mutex_enter(qlock);
2282 rdc_dump_iohdrs(q);
2283 SET_QNXTIO(q, QHEAD(q));
2284 SET_QCOALBOUNDS(q, QHEAD(q));
2285 mutex_exit(qlock);
2286 }
2287 }
2288 }
2289
2290 mutex_enter(qlock);
2291 /*
2292 * Check to see if the reply has arrived out of
2293 * order, if so don't update seqack.
2294 */
2295 if (!RDC_INFRONT(aio->seq, group->seqack)) {
2296 group->seqack = aio->seq;
2297 }
2298 #ifdef DEBUG
2299 else {
2300 rdc_ooreply++;
2301 }
2302 #endif
2303 if (group->asyncstall) {
2304 cv_broadcast(&group->asyncqcv);
2305 }
2306 mutex_exit(qlock);
2307 } else if (netret.result < 0) {
2308 aio->iostatus = RDC_IO_FAILED;
2309 }
2310
2311 /*
2312 * see if we have any pending async requests we can mark
2313 * as done.
2314 */
2315
2316 if (netret.vecdata.vecdata_len) {
2317 net_pendvec_t *vecp;
2318 net_pendvec_t *vecpe;
2319 vecp = netret.vecdata.vecdata_val;
2320 vecpe = netret.vecdata.vecdata_val + netret.vecdata.vecdata_len;
2321 while (vecp < vecpe) {
2322 rdc_k_info_t *krdcp = &rdc_k_info[vecp->pindex];
2323 rdc_u_info_t *urdcp = &rdc_u_info[vecp->pindex];
2324 /*
2325 * we must always still be in the same group.
2326 */
2327 ASSERT(krdcp->group == group);
2328 vflags = rdc_get_vflags(urdcp);
2329
2330 if (!(vflags &
2331 (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2332 RDC_CLR_BITMAP(krdcp, vecp->apos, vecp->alen, \
2333 0xffffffff, RDC_BIT_BUMP);
2334 if (RDC_IS_DISKQ(krdcp->group)) {
2335 if (!IS_STATE(urdc, RDC_LOGGING)) {
2336 /* update queue info */
2337 rdc_clr_iohdr(krdc, vecp->qpos);
2338 } else { /* we've gone logging */
2339 mutex_enter(qlock);
2340 rdc_dump_iohdrs(q);
2341 SET_QNXTIO(q, QHEAD(q));
2342 SET_QCOALBOUNDS(q, QHEAD(q));
2343 mutex_exit(qlock);
2344 }
2345 }
2346 }
2347
2348 /*
2349 * see if we can re-start transmission
2350 */
2351 mutex_enter(qlock);
2352 if (!RDC_INFRONT(vecp->seq, group->seqack)) {
2353 group->seqack = vecp->seq;
2354 }
2355 #ifdef DEBUG
2356 else {
2357 rdc_ooreply++;
2358 }
2359 #endif
2360 DTRACE_PROBE1(pendvec_return, int, vecp->seq);
2361
2362 if (group->asyncstall) {
2363 cv_broadcast(&group->asyncqcv);
2364 }
2365 mutex_exit(qlock);
2366 vecp++;
2367 }
2368 }
2369 if (netret.vecdata.vecdata_val)
2370 kmem_free(netret.vecdata.vecdata_val,
2371 netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
2372 return;
2373 failed:
2374
2375 /* perhaps we have a few threads stuck .. */
2376 if (group->asyncstall) {
2377 group->asyncdis = 1;
2378 cv_broadcast(&group->asyncqcv);
2379 }
2380 if (netret.vecdata.vecdata_val)
2381 kmem_free(netret.vecdata.vecdata_val,
2382 netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
2383
2384 mutex_enter(qlock);
2385 if (RDC_IS_DISKQ(group)) {
2386 /* free locally alloc'd hanlde */
2387 if ((aio->handle) &&
2388 (aio->handle->sb_user == RDC_NULLBUFREAD)) {
2389 (void) nsc_free_buf(aio->handle);
2390 aio->handle = NULL;
2391 }
2392 aio->qhandle->sb_user--;
2393 if (aio->qhandle->sb_user == 0) {
2394 (void) _rdc_rsrv_diskq(group);
2395 rdc_fixlen(aio);
2396 (void) nsc_free_buf(aio->qhandle);
2397 aio->qhandle = NULL;
2398 aio->handle = NULL;
2399 _rdc_rlse_diskq(group);
2400 }
2401 } else {
2402 if (aio->handle) {
2403 (void) nsc_free_buf(aio->handle);
2404 aio->handle = NULL;
2405 }
2406 }
2407 mutex_exit(qlock);
2408
2409 if (reserved) {
2410 _rdc_rlse_devs(krdc, rtype);
2411 }
2412
2413 if ((waitq && krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) {
2414 mutex_enter(krdc->io_kstats->ks_lock);
2415 kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2416 mutex_exit(krdc->io_kstats->ks_lock);
2417 }
2418
2419 /* make sure that the bit is still set */
2420 RDC_CHECK_BIT(krdc, aio->pos, aio->len);
2421
2422 if (aio->iostatus != RDC_IO_CANCELLED)
2423 aio->iostatus = RDC_IO_FAILED;
2424 }
2425
2426
2427 /*
2428 * rdc_drain_disk_queue
2429 * drain the async network queue for the whole group. Bail out if nothing
2430 * happens in 20 sec
2431 * returns -1 if it bails before the queues are drained.
2432 */
2433 #define NUM_RETRIES 15 /* Number of retries to wait if no progress */
2434 int
rdc_drain_disk_queue(int index)2435 rdc_drain_disk_queue(int index)
2436 {
2437 rdc_k_info_t *krdc = &rdc_k_info[index];
2438 volatile rdc_group_t *group;
2439 volatile disk_queue *diskq;
2440 int threads, counter;
2441 long blocks;
2442
2443 /* Sanity checking */
2444 if (index > rdc_max_sets)
2445 return (0);
2446
2447 /*
2448 * If there is no group or diskq configured, we can leave now
2449 */
2450 if (!(group = krdc->group) || !(diskq = &group->diskq))
2451 return (0);
2452
2453 /*
2454 * No need to wait if EMPTY and threads are gone
2455 */
2456 counter = 0;
2457 while (!QEMPTY(diskq) || group->rdc_thrnum) {
2458
2459 /*
2460 * Capture counters to determine if progress is being made
2461 */
2462 blocks = QBLOCKS(diskq);
2463 threads = group->rdc_thrnum;
2464
2465 /*
2466 * Wait
2467 */
2468 delay(HZ);
2469
2470 /*
2471 * Has the group or disk queue gone away while delayed?
2472 */
2473 if (!(group = krdc->group) || !(diskq = &group->diskq))
2474 return (0);
2475
2476 /*
2477 * Are we still seeing progress?
2478 */
2479 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
2480 /*
2481 * No progress seen, increment retry counter
2482 */
2483 if (counter++ > NUM_RETRIES) {
2484 return (-1);
2485 }
2486 } else {
2487 /*
2488 * Reset counter, as we've made progress
2489 */
2490 counter = 0;
2491 }
2492 }
2493
2494 return (0);
2495 }
2496
2497 /*
2498 * decide what needs to be drained, disk or core
2499 * and drain it
2500 */
2501 int
rdc_drain_queue(int index)2502 rdc_drain_queue(int index)
2503 {
2504 rdc_k_info_t *krdc = &rdc_k_info[index];
2505 rdc_group_t *group = krdc->group;
2506
2507 if (!group)
2508 return (0);
2509
2510 if (RDC_IS_DISKQ(group))
2511 return (rdc_drain_disk_queue(index));
2512 if (RDC_IS_MEMQ(group))
2513 return (rdc_drain_net_queue(index));
2514 /* oops.. */
2515 #ifdef DEBUG
2516 cmn_err(CE_WARN, "!rdc_drain_queue: "
2517 "attempting drain of unknown Q type");
2518 #endif
2519 return (0);
2520 }
2521
2522 /*
2523 * rdc_drain_net_queue
2524 * drain the async network queue for the whole group. Bail out if nothing
2525 * happens in 20 sec
2526 * returns -1 if it bails before the queues are drained.
2527 */
2528 int
rdc_drain_net_queue(int index)2529 rdc_drain_net_queue(int index)
2530 {
2531 rdc_k_info_t *krdc = &rdc_k_info[index];
2532 volatile net_queue *q;
2533 int bail = 20; /* bail out in about 20 secs */
2534 nsc_size_t blocks;
2535
2536 /* Sanity checking */
2537 if (index > rdc_max_sets)
2538 return (0);
2539 if (!krdc->group)
2540 return (0);
2541 /* LINTED */
2542 if (!(q = &krdc->group->ra_queue))
2543 return (0);
2544
2545 /* CONSTCOND */
2546 while (1) {
2547
2548 if (((volatile rdc_aio_t *)q->net_qhead == NULL) &&
2549 (krdc->group->rdc_thrnum == 0)) {
2550 break;
2551 }
2552
2553 blocks = q->blocks;
2554
2555 q = (volatile net_queue *)&krdc->group->ra_queue;
2556
2557 if ((blocks == q->blocks) &&
2558 (--bail <= 0)) {
2559 break;
2560 }
2561
2562 delay(HZ);
2563 }
2564
2565 if (bail <= 0)
2566 return (-1);
2567
2568 return (0);
2569 }
2570
2571 /*
2572 * rdc_dump_queue
2573 * We want to release all the blocks currently on the network flushing queue
2574 * We already have them logged in the bitmap.
2575 */
2576 void
rdc_dump_queue(int index)2577 rdc_dump_queue(int index)
2578 {
2579 rdc_k_info_t *krdc = &rdc_k_info[index];
2580 rdc_aio_t *aio;
2581 net_queue *q;
2582 rdc_group_t *group;
2583 disk_queue *dq;
2584 kmutex_t *qlock;
2585
2586 group = krdc->group;
2587
2588 q = &group->ra_queue;
2589 dq = &group->diskq;
2590
2591 /*
2592 * gotta have both locks here for diskq
2593 */
2594
2595 if (RDC_IS_DISKQ(group)) {
2596 mutex_enter(&q->net_qlock);
2597 if (q->qfill_sleeping == RDC_QFILL_AWAKE) {
2598 int tries = 3;
2599 #ifdef DEBUG_DISKQ
2600 cmn_err(CE_NOTE,
2601 "!dumpq sending diskq->memq flusher to sleep");
2602 #endif
2603 q->qfflags |= RDC_QFILLSLEEP;
2604 mutex_exit(&q->net_qlock);
2605 while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--)
2606 delay(5);
2607 mutex_enter(&q->net_qlock);
2608 }
2609 }
2610
2611 if (RDC_IS_DISKQ(group)) {
2612 qlock = &dq->disk_qlock;
2613 (void) _rdc_rsrv_diskq(group);
2614 } else {
2615 qlock = &q->net_qlock;
2616 }
2617
2618 mutex_enter(qlock);
2619
2620 group->seq = RDC_NEWSEQ; /* reset the sequence number */
2621 group->seqack = RDC_NEWSEQ;
2622
2623 /* if the q is on disk, dump the q->iohdr chain */
2624 if (RDC_IS_DISKQ(group)) {
2625 rdc_dump_iohdrs(dq);
2626
2627 /* back up the nxtio pointer */
2628 SET_QNXTIO(dq, QHEAD(dq));
2629 SET_QCOALBOUNDS(dq, QHEAD(dq));
2630 }
2631
2632 while (q->net_qhead) {
2633 rdc_k_info_t *tmpkrdc;
2634 aio = q->net_qhead;
2635 tmpkrdc = &rdc_k_info[aio->index];
2636
2637 if (RDC_IS_DISKQ(group)) {
2638 aio->qhandle->sb_user--;
2639 if (aio->qhandle->sb_user == 0) {
2640 rdc_fixlen(aio);
2641 (void) nsc_free_buf(aio->qhandle);
2642 aio->qhandle = NULL;
2643 aio->handle = NULL;
2644 }
2645 } else {
2646 if (aio->handle) {
2647 (void) nsc_free_buf(aio->handle);
2648 aio->handle = NULL;
2649 }
2650 }
2651
2652 q->net_qhead = aio->next;
2653 RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len);
2654
2655 kmem_free(aio, sizeof (*aio));
2656 if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(group)) {
2657 mutex_enter(tmpkrdc->io_kstats->ks_lock);
2658 kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats));
2659 mutex_exit(tmpkrdc->io_kstats->ks_lock);
2660 }
2661
2662 }
2663
2664 q->net_qtail = NULL;
2665 q->blocks = 0;
2666 q->nitems = 0;
2667
2668 /*
2669 * See if we have stalled threads.
2670 */
2671 done:
2672 if (group->asyncstall) {
2673 group->asyncdis = 1;
2674 cv_broadcast(&group->asyncqcv);
2675 }
2676 mutex_exit(qlock);
2677 if (RDC_IS_DISKQ(group)) {
2678 mutex_exit(&q->net_qlock);
2679 _rdc_rlse_diskq(group);
2680 }
2681
2682 }
2683
2684
2685 /*
2686 * rdc_clnt_get
2687 * Get a CLIENT handle and cache it
2688 */
2689
2690 static int
rdc_clnt_get(rdc_srv_t * svp,rpcvers_t vers,struct chtab ** rch,CLIENT ** clp)2691 rdc_clnt_get(rdc_srv_t *svp, rpcvers_t vers, struct chtab **rch, CLIENT **clp)
2692 {
2693 uint_t max_msgsize;
2694 int retries;
2695 int ret;
2696 struct cred *cred;
2697 int num_clnts = 0;
2698 register struct chtab *ch;
2699 struct chtab **plistp;
2700 CLIENT *client = 0;
2701
2702 if (rch) {
2703 *rch = 0;
2704 }
2705
2706 if (clp) {
2707 *clp = 0;
2708 }
2709
2710 retries = 6; /* Never used for COTS in Solaris */
2711 cred = ddi_get_cred();
2712 max_msgsize = RDC_RPC_MAX;
2713
2714 mutex_enter(&rdc_clnt_lock);
2715
2716 ch = rdc_chtable;
2717 plistp = &rdc_chtable;
2718
2719 /* find the right ch_list chain */
2720
2721 for (ch = rdc_chtable; ch != NULL; ch = ch->ch_next) {
2722 if (ch->ch_prog == RDC_PROGRAM &&
2723 ch->ch_vers == vers &&
2724 ch->ch_dev == svp->ri_knconf->knc_rdev &&
2725 ch->ch_protofmly != NULL &&
2726 strcmp(ch->ch_protofmly,
2727 svp->ri_knconf->knc_protofmly) == 0) {
2728 /* found the correct chain to walk */
2729 break;
2730 }
2731 plistp = &ch->ch_next;
2732 }
2733
2734 if (ch != NULL) {
2735 /* walk the ch_list and try and find a free client */
2736
2737 for (num_clnts = 0; ch != NULL; ch = ch->ch_list, num_clnts++) {
2738 if (ch->ch_inuse == FALSE) {
2739 /* suitable handle to reuse */
2740 break;
2741 }
2742 plistp = &ch->ch_list;
2743 }
2744 }
2745
2746 if (ch == NULL && num_clnts >= MAXCLIENTS) {
2747 /* alloc a temporary handle and return */
2748
2749 rdc_clnt_toomany++;
2750 mutex_exit(&rdc_clnt_lock);
2751
2752 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2753 RDC_PROGRAM, vers, max_msgsize, retries, cred, &client);
2754
2755 if (ret != 0) {
2756 cmn_err(CE_NOTE,
2757 "!rdc_call: tli_kcreate failed %d", ret);
2758 return (ret);
2759 }
2760
2761 *rch = 0;
2762 *clp = client;
2763 (void) CLNT_CONTROL(client, CLSET_PROGRESS, NULL);
2764 return (ret);
2765 }
2766
2767 if (ch != NULL) {
2768 /* reuse a cached handle */
2769
2770 ch->ch_inuse = TRUE;
2771 ch->ch_timesused++;
2772 mutex_exit(&rdc_clnt_lock);
2773
2774 *rch = ch;
2775
2776 if (ch->ch_client == NULL) {
2777 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2778 RDC_PROGRAM, vers, max_msgsize, retries,
2779 cred, &ch->ch_client);
2780 if (ret != 0) {
2781 ch->ch_inuse = FALSE;
2782 return (ret);
2783 }
2784
2785 (void) CLNT_CONTROL(ch->ch_client, CLSET_PROGRESS,
2786 NULL);
2787 *clp = ch->ch_client;
2788
2789 return (0);
2790 } else {
2791 /*
2792 * Consecutive calls to CLNT_CALL() on the same client handle
2793 * get the same transaction ID. We want a new xid per call,
2794 * so we first reinitialise the handle.
2795 */
2796 (void) clnt_tli_kinit(ch->ch_client, svp->ri_knconf,
2797 &(svp->ri_addr), max_msgsize, retries, cred);
2798
2799 *clp = ch->ch_client;
2800 return (0);
2801 }
2802 }
2803
2804 /* create new handle and cache it */
2805 ch = (struct chtab *)kmem_zalloc(sizeof (*ch), KM_SLEEP);
2806
2807 if (ch) {
2808 ch->ch_inuse = TRUE;
2809 ch->ch_prog = RDC_PROGRAM;
2810 ch->ch_vers = vers;
2811 ch->ch_dev = svp->ri_knconf->knc_rdev;
2812 ch->ch_protofmly = (char *)kmem_zalloc(
2813 strlen(svp->ri_knconf->knc_protofmly)+1, KM_SLEEP);
2814 if (ch->ch_protofmly)
2815 (void) strcpy(ch->ch_protofmly,
2816 svp->ri_knconf->knc_protofmly);
2817 *plistp = ch;
2818 }
2819
2820 mutex_exit(&rdc_clnt_lock);
2821
2822 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2823 RDC_PROGRAM, vers, max_msgsize, retries, cred, clp);
2824
2825 if (ret != 0) {
2826 if (ch)
2827 ch->ch_inuse = FALSE;
2828 cmn_err(CE_NOTE, "!rdc_call: tli_kcreate failed %d", ret);
2829 return (ret);
2830 }
2831
2832 *rch = ch;
2833 if (ch)
2834 ch->ch_client = *clp;
2835
2836 (void) CLNT_CONTROL(*clp, CLSET_PROGRESS, NULL);
2837
2838 return (ret);
2839 }
2840
2841
2842 long rdc_clnt_count = 0;
2843
2844 /*
2845 * rdc_clnt_call
2846 * Arguments:
2847 * rdc_srv_t *svp - rdc servinfo
2848 * rpcproc_t proc; - rpcid
2849 * rpcvers_t vers; - protocol version
2850 * xdrproc_t xargs;- xdr function
2851 * caddr_t argsp;- args to xdr function
2852 * xdrproc_t xres;- xdr function
2853 * caddr_t resp;- args to xdr function
2854 * struct timeval timeout;
2855 * Performs RPC client call using specific protocol and version
2856 */
2857
2858 int
rdc_clnt_call(rdc_srv_t * svp,rpcproc_t proc,rpcvers_t vers,xdrproc_t xargs,caddr_t argsp,xdrproc_t xres,caddr_t resp,struct timeval * timeout)2859 rdc_clnt_call(rdc_srv_t *svp, rpcproc_t proc, rpcvers_t vers,
2860 xdrproc_t xargs, caddr_t argsp,
2861 xdrproc_t xres, caddr_t resp, struct timeval *timeout)
2862 {
2863 CLIENT *rh = NULL;
2864 int err;
2865 int tries = 0;
2866 struct chtab *ch = NULL;
2867
2868 err = rdc_clnt_get(svp, vers, &ch, &rh);
2869 if (err || !rh)
2870 return (err);
2871
2872 do {
2873 DTRACE_PROBE3(rdc_clnt_call_1,
2874 CLIENT *, rh, rpcproc_t, proc, xdrproc_t, xargs);
2875
2876 err = cl_call_sig(rh, proc, xargs, argsp, xres, resp, *timeout);
2877
2878 DTRACE_PROBE1(rdc_clnt_call_end, int, err);
2879
2880 switch (err) {
2881 case RPC_SUCCESS: /* bail now */
2882 goto done;
2883 case RPC_INTR: /* No recovery from this */
2884 goto done;
2885 case RPC_PROGVERSMISMATCH:
2886 goto done;
2887 case RPC_TLIERROR:
2888 /* fall thru */
2889 case RPC_XPRTFAILED:
2890 /* Delay here to err on side of caution */
2891 /* fall thru */
2892 case RPC_VERSMISMATCH:
2893
2894 default:
2895 if (IS_UNRECOVERABLE_RPC(err)) {
2896 goto done;
2897 }
2898 tries++;
2899 /*
2900 * The call is in progress (over COTS)
2901 * Try the CLNT_CALL again, but don't
2902 * print a noisy error message
2903 */
2904 if (err == RPC_INPROGRESS)
2905 break;
2906 cmn_err(CE_NOTE, "!SNDR client: err %d %s",
2907 err, clnt_sperrno(err));
2908 }
2909 } while (tries && (tries < 2));
2910 done:
2911 ++rdc_clnt_count;
2912 rdc_clnt_free(ch, rh);
2913 return (err);
2914 }
2915
2916
2917 /*
2918 * Call an rpc from the client side, not caring which protocol is used.
2919 */
2920 int
rdc_clnt_call_any(rdc_srv_t * svp,rdc_if_t * ip,rpcproc_t proc,xdrproc_t xargs,caddr_t argsp,xdrproc_t xres,caddr_t resp,struct timeval * timeout)2921 rdc_clnt_call_any(rdc_srv_t *svp, rdc_if_t *ip, rpcproc_t proc,
2922 xdrproc_t xargs, caddr_t argsp,
2923 xdrproc_t xres, caddr_t resp, struct timeval *timeout)
2924 {
2925 rpcvers_t vers;
2926 int rc;
2927
2928 if (ip != NULL) {
2929 vers = ip->rpc_version;
2930 } else {
2931 vers = RDC_VERS_MAX;
2932 }
2933
2934 do {
2935 rc = rdc_clnt_call(svp, proc, vers, xargs, argsp,
2936 xres, resp, timeout);
2937
2938 if (rc == RPC_PROGVERSMISMATCH) {
2939 /*
2940 * Downgrade and try again.
2941 */
2942 vers--;
2943 }
2944 } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH));
2945
2946 if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) {
2947 mutex_enter(&rdc_ping_lock);
2948 ip->rpc_version = vers;
2949 mutex_exit(&rdc_ping_lock);
2950 }
2951
2952 return (rc);
2953 }
2954
2955 /*
2956 * Call an rpc from the client side, starting with protocol specified
2957 */
2958 int
rdc_clnt_call_walk(rdc_k_info_t * krdc,rpcproc_t proc,xdrproc_t xargs,caddr_t argsp,xdrproc_t xres,caddr_t resp,struct timeval * timeout)2959 rdc_clnt_call_walk(rdc_k_info_t *krdc, rpcproc_t proc, xdrproc_t xargs,
2960 caddr_t argsp, xdrproc_t xres, caddr_t resp,
2961 struct timeval *timeout)
2962 {
2963 int rc;
2964 rpcvers_t vers;
2965 rdc_srv_t *svp = krdc->lsrv;
2966 rdc_if_t *ip = krdc->intf;
2967 vers = krdc->rpc_version;
2968
2969 do {
2970 rc = rdc_clnt_call(svp, proc, vers, xargs, argsp,
2971 xres, resp, timeout);
2972
2973 if (rc == RPC_PROGVERSMISMATCH) {
2974 /*
2975 * Downgrade and try again.
2976 */
2977 vers--;
2978 }
2979 } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH));
2980
2981 if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) {
2982 mutex_enter(&rdc_ping_lock);
2983 ip->rpc_version = vers;
2984 mutex_exit(&rdc_ping_lock);
2985 }
2986
2987 return (rc);
2988 }
2989
2990 /*
2991 * rdc_clnt_free
2992 * Free a client structure into the cache, or if this was a temporary
2993 * handle allocated above MAXCLIENTS, destroy it.
2994 */
2995 static void
rdc_clnt_free(struct chtab * ch,CLIENT * clp)2996 rdc_clnt_free(struct chtab *ch, CLIENT *clp)
2997 {
2998 if (ch != NULL) {
2999 /* cached client, just clear inuse flag and return */
3000 ASSERT(ch->ch_client == clp);
3001 ch->ch_inuse = FALSE;
3002 return;
3003 }
3004
3005 /* temporary handle allocated above MAXCLIENTS, so destroy it */
3006
3007 if (clp->cl_auth) {
3008 AUTH_DESTROY(clp->cl_auth);
3009 clp->cl_auth = 0;
3010 }
3011
3012 CLNT_DESTROY(clp);
3013 }
3014
3015
3016 /*
3017 * _rdc_clnt_destroy
3018 * Free a chain (ch_list or ch_next) of cached clients
3019 */
3020 static int
_rdc_clnt_destroy(struct chtab ** p,const int list)3021 _rdc_clnt_destroy(struct chtab **p, const int list)
3022 {
3023 struct chtab *ch;
3024 int leak = 0;
3025
3026 if (!p)
3027 return (0);
3028
3029 while (*p != NULL) {
3030 ch = *p;
3031
3032 /*
3033 * unlink from the chain
3034 * - this leaks the client if it was inuse
3035 */
3036
3037 *p = list ? ch->ch_list : ch->ch_next;
3038
3039 if (!ch->ch_inuse) {
3040 /* unused client - destroy it */
3041
3042 if (ch->ch_client) {
3043 if (ch->ch_client->cl_auth) {
3044 AUTH_DESTROY(ch->ch_client->cl_auth);
3045 ch->ch_client->cl_auth = 0;
3046 }
3047
3048 CLNT_DESTROY(ch->ch_client);
3049 ch->ch_client = 0;
3050 }
3051
3052 if (ch->ch_protofmly)
3053 kmem_free(ch->ch_protofmly,
3054 strlen(ch->ch_protofmly)+1);
3055
3056 kmem_free(ch, sizeof (*ch));
3057 } else {
3058 /* remember client leak */
3059 leak++;
3060 }
3061 }
3062
3063 return (leak);
3064 }
3065
3066
3067 /*
3068 * rdc_clnt_destroy
3069 * Free client caching table on unconfigure
3070 */
3071 void
rdc_clnt_destroy(void)3072 rdc_clnt_destroy(void)
3073 {
3074 struct chtab *ch;
3075 int leak = 0;
3076
3077 mutex_enter(&rdc_clnt_lock);
3078
3079 /* destroy each ch_list chain */
3080
3081 for (ch = rdc_chtable; ch; ch = ch->ch_next) {
3082 leak += _rdc_clnt_destroy(&ch->ch_list, 1);
3083 }
3084
3085 /* destroy the main ch_next chain */
3086 leak += _rdc_clnt_destroy(&rdc_chtable, 0);
3087
3088 if (leak) {
3089 /* we are about to leak clients */
3090 cmn_err(CE_WARN,
3091 "!rdc_clnt_destroy: leaking %d inuse clients", leak);
3092 }
3093
3094 mutex_exit(&rdc_clnt_lock);
3095 }
3096
3097 #ifdef DEBUG
3098 /*
3099 * Function to send an asynchronous net_data6 request
3100 * direct to a server to allow the generation of
3101 * out of order requests for ZatoIchi tests.
3102 */
3103 int
rdc_async6(void * arg,int mode,int * rvp)3104 rdc_async6(void *arg, int mode, int *rvp)
3105 {
3106 int index;
3107 rdc_async6_t async6;
3108 struct net_data6 data6;
3109 rdc_k_info_t *krdc;
3110 rdc_u_info_t *urdc;
3111 char *data;
3112 int datasz;
3113 char *datap;
3114 int rc;
3115 struct timeval t;
3116 struct netwriteres netret;
3117 int i;
3118
3119 rc = 0;
3120 *rvp = 0;
3121 /*
3122 * copyin the user's arguments.
3123 */
3124 if (ddi_copyin(arg, &async6, sizeof (async6), mode) < 0) {
3125 return (EFAULT);
3126 }
3127
3128 /*
3129 * search by the secondary host and file.
3130 */
3131 mutex_enter(&rdc_conf_lock);
3132 for (index = 0; index < rdc_max_sets; index++) {
3133 urdc = &rdc_u_info[index];
3134 krdc = &rdc_k_info[index];
3135
3136 if (!IS_CONFIGURED(krdc))
3137 continue;
3138 if (!IS_ENABLED(urdc))
3139 continue;
3140 if (!IS_ASYNC(urdc))
3141 continue;
3142 if (krdc->rpc_version < RDC_VERSION6)
3143 continue;
3144
3145 if ((strncmp(urdc->secondary.intf, async6.sechost,
3146 MAX_RDC_HOST_SIZE) == 0) &&
3147 (strncmp(urdc->secondary.file, async6.secfile,
3148 NSC_MAXPATH) == 0)) {
3149 break;
3150 }
3151 }
3152 mutex_exit(&rdc_conf_lock);
3153 if (index >= rdc_max_sets) {
3154 return (ENOENT);
3155 }
3156
3157 if (async6.spos != -1) {
3158 if ((async6.spos < async6.pos) ||
3159 ((async6.spos + async6.slen) >
3160 (async6.pos + async6.len))) {
3161 cmn_err(CE_WARN, "!Sub task not within range "
3162 "start %d length %d sub start %d sub length %d",
3163 async6.pos, async6.len, async6.spos, async6.slen);
3164 return (EIO);
3165 }
3166 }
3167
3168 datasz = FBA_SIZE(1);
3169 data = kmem_alloc(datasz, KM_SLEEP);
3170 datap = data;
3171 while (datap < &data[datasz]) {
3172 /* LINTED */
3173 *datap++ = async6.pat;
3174 }
3175
3176 /*
3177 * Fill in the net databuffer prior to transmission.
3178 */
3179
3180 data6.local_cd = krdc->index;
3181 if (krdc->remote_index == -1) {
3182 cmn_err(CE_WARN, "!Remote index not known");
3183 kmem_free(data, datasz);
3184 return (EIO);
3185 } else {
3186 data6.cd = krdc->remote_index;
3187 }
3188 data6.pos = async6.pos;
3189 data6.len = async6.len;
3190 data6.flag = 0;
3191 data6.idx = async6.idx;
3192 data6.seq = async6.seq;
3193
3194 if (async6.spos == -1) {
3195 data6.sfba = async6.pos;
3196 data6.nfba = async6.len;
3197 data6.endoblk = 1;
3198
3199 } else {
3200 data6.sfba = async6.spos;
3201 data6.nfba = async6.slen;
3202 data6.endoblk = async6.endind;
3203 }
3204
3205 data6.data.data_len = datasz;
3206 data6.data.data_val = data;
3207
3208 t.tv_sec = rdc_rpc_tmout;
3209 t.tv_usec = 0;
3210
3211 netret.vecdata.vecdata_val = NULL;
3212 netret.vecdata.vecdata_len = 0;
3213
3214
3215 rc = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, krdc->rpc_version,
3216 xdr_net_data6, (char *)&data6, xdr_netwriteres, (char *)&netret,
3217 &t);
3218
3219 kmem_free(data, datasz);
3220 if (rc == 0) {
3221 if (netret.result < 0) {
3222 rc = -netret.result;
3223 }
3224 cmn_err(CE_NOTE, "!async6: seq %u result %d index %d "
3225 "pendcnt %d",
3226 netret.seq, netret.result, netret.index,
3227 netret.vecdata.vecdata_len);
3228 for (i = 0; i < netret.vecdata.vecdata_len; i++) {
3229 net_pendvec_t pvec;
3230 bcopy(netret.vecdata.vecdata_val + i, &pvec,
3231 sizeof (net_pendvec_t));
3232 cmn_err(CE_NOTE, "!Seq %u pos %llu len %llu",
3233 pvec.seq, (unsigned long long)pvec.apos,
3234 (unsigned long long)pvec.alen);
3235 }
3236 if (netret.vecdata.vecdata_val)
3237 kmem_free(netret.vecdata.vecdata_val,
3238 netret.vecdata.vecdata_len *
3239 sizeof (net_pendvec_t));
3240 } else {
3241 cmn_err(CE_NOTE, "!async6: rpc call failed %d", rc);
3242 }
3243 *rvp = netret.index;
3244 return (rc);
3245 }
3246
3247 /*
3248 * Function to send an net_read6 request
3249 * direct to a server to allow the generation of
3250 * read requests.
3251 */
3252 int
rdc_readgen(void * arg,int mode,int * rvp)3253 rdc_readgen(void *arg, int mode, int *rvp)
3254 {
3255 int index;
3256 rdc_readgen_t readgen;
3257 rdc_readgen32_t readgen32;
3258 struct rread6 read6;
3259 struct rread read5;
3260 rdc_k_info_t *krdc;
3261 int ret;
3262 struct timeval t;
3263 struct rdcrdresult rr;
3264 int err;
3265
3266 *rvp = 0;
3267 rr.rr_bufsize = 0; /* rpc data buffer length (bytes) */
3268 rr.rr_data = NULL; /* rpc data buffer */
3269 if (ddi_model_convert_from(mode & FMODELS) == DDI_MODEL_ILP32) {
3270 if (ddi_copyin(arg, &readgen32, sizeof (readgen32), mode)) {
3271 return (EFAULT);
3272 }
3273 (void) strncpy(readgen.sechost, readgen32.sechost,
3274 MAX_RDC_HOST_SIZE);
3275 (void) strncpy(readgen.secfile, readgen32.secfile, NSC_MAXPATH);
3276 readgen.len = readgen32.len;
3277 readgen.pos = readgen32.pos;
3278 readgen.idx = readgen32.idx;
3279 readgen.flag = readgen32.flag;
3280 readgen.data = (void *)(unsigned long)readgen32.data;
3281 readgen.rpcversion = readgen32.rpcversion;
3282 } else {
3283 if (ddi_copyin(arg, &readgen, sizeof (readgen), mode)) {
3284 return (EFAULT);
3285 }
3286 }
3287 switch (readgen.rpcversion) {
3288 case 5:
3289 case 6:
3290 break;
3291 default:
3292 return (EINVAL);
3293 }
3294
3295 mutex_enter(&rdc_conf_lock);
3296 index = rdc_lookup_byhostdev(readgen.sechost, readgen.secfile);
3297 if (index >= 0) {
3298 krdc = &rdc_k_info[index];
3299 }
3300 if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
3301 mutex_exit(&rdc_conf_lock);
3302 return (ENODEV);
3303 }
3304 /*
3305 * we should really call setbusy here.
3306 */
3307 mutex_exit(&rdc_conf_lock);
3308
3309 t.tv_sec = rdc_rpc_tmout;
3310 t.tv_usec = 0;
3311 if (krdc->remote_index == -1) {
3312 cmn_err(CE_WARN, "!Remote index not known");
3313 ret = EIO;
3314 goto out;
3315 }
3316 if (readgen.rpcversion == 6) {
3317 read6.cd = krdc->remote_index;
3318 read6.len = readgen.len;
3319 read6.pos = readgen.pos;
3320 read6.idx = readgen.idx;
3321 read6.flag = readgen.flag;
3322 } else {
3323 read5.cd = krdc->remote_index;
3324 read5.len = readgen.len;
3325 read5.pos = readgen.pos;
3326 read5.idx = readgen.idx;
3327 read5.flag = readgen.flag;
3328 }
3329
3330 if (readgen.flag & RDC_RREAD_START) {
3331 if (readgen.rpcversion == 6) {
3332 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
3333 RDC_VERSION6, xdr_rread6, (char *)&read6,
3334 xdr_int, (char *)&ret, &t);
3335 } else {
3336 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
3337 RDC_VERSION5, xdr_rread, (char *)&read5,
3338 xdr_int, (char *)&ret, &t);
3339 }
3340 if (err == 0) {
3341 *rvp = ret;
3342 ret = 0;
3343 } else {
3344 ret = EPROTO;
3345 }
3346 } else {
3347 if (readgen.rpcversion == 6) {
3348 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
3349 RDC_VERSION6, xdr_rread6, (char *)&read6,
3350 xdr_rdresult, (char *)&rr, &t);
3351 } else {
3352 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
3353 RDC_VERSION5, xdr_rread, (char *)&read5,
3354 xdr_rdresult, (char *)&rr, &t);
3355 }
3356 if (err == 0) {
3357 if (rr.rr_status != RDC_OK) {
3358 ret = EIO;
3359 goto out;
3360 }
3361 *rvp = rr.rr_bufsize;
3362 if (ddi_copyout(rr.rr_data, readgen.data,
3363 rr.rr_bufsize, mode) != 0) {
3364 ret = EFAULT;
3365 goto out;
3366 }
3367 ret = 0;
3368 } else {
3369 ret = EPROTO;
3370 goto out;
3371 }
3372 }
3373 out:
3374 if (rr.rr_data) {
3375 kmem_free(rr.rr_data, rr.rr_bufsize);
3376 }
3377 return (ret);
3378 }
3379
3380
3381 #endif
3382