1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright (c) 2012 by Delphix. All rights reserved. 24 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 25 */ 26 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 27 /* All Rights Reserved */ 28 /* 29 * Portions of this source code were derived from Berkeley 30 * 4.3 BSD under license from the Regents of the University of 31 * California. 32 */ 33 34 /* 35 * Server side of RPC over RDMA in the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/types.h> 40 #include <sys/user.h> 41 #include <sys/sysmacros.h> 42 #include <sys/proc.h> 43 #include <sys/file.h> 44 #include <sys/errno.h> 45 #include <sys/kmem.h> 46 #include <sys/debug.h> 47 #include <sys/systm.h> 48 #include <sys/cmn_err.h> 49 #include <sys/kstat.h> 50 #include <sys/vtrace.h> 51 #include <sys/debug.h> 52 53 #include <rpc/types.h> 54 #include <rpc/xdr.h> 55 #include <rpc/auth.h> 56 #include <rpc/clnt.h> 57 #include <rpc/rpc_msg.h> 58 #include <rpc/svc.h> 59 #include <rpc/rpc_rdma.h> 60 #include <sys/ddi.h> 61 #include <sys/sunddi.h> 62 63 #include <inet/common.h> 64 #include <inet/ip.h> 65 #include <inet/ip6.h> 66 67 #include <nfs/nfs.h> 68 #include <sys/sdt.h> 69 70 #define SVC_RDMA_SUCCESS 0 71 #define SVC_RDMA_FAIL -1 72 73 #define SVC_CREDIT_FACTOR (0.5) 74 75 #define MSG_IS_RPCSEC_GSS(msg) \ 76 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) 77 78 79 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT; 80 81 /* 82 * RDMA transport specific data associated with SVCMASTERXPRT 83 */ 84 struct rdma_data { 85 SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */ 86 struct rdma_svc_data rd_data; /* rdma data */ 87 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */ 88 }; 89 90 /* 91 * Plugin connection specific data stashed away in clone SVCXPRT 92 */ 93 struct clone_rdma_data { 94 bool_t cloned; /* xprt cloned for thread processing */ 95 CONN *conn; /* RDMA connection */ 96 rdma_buf_t rpcbuf; /* RPC req/resp buffer */ 97 struct clist *cl_reply; /* reply chunk buffer info */ 98 struct clist *cl_wlist; /* write list clist */ 99 }; 100 101 102 #define MAXADDRLEN 128 /* max length for address mask */ 103 104 /* 105 * Routines exported through ops vector. 106 */ 107 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *); 108 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *); 109 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t); 110 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t); 111 void svc_rdma_kdestroy(SVCMASTERXPRT *); 112 static int svc_rdma_kdup(struct svc_req *, caddr_t, int, 113 struct dupreq **, bool_t *); 114 static void svc_rdma_kdupdone(struct dupreq *, caddr_t, 115 void (*)(), int, int); 116 static int32_t *svc_rdma_kgetres(SVCXPRT *, int); 117 static void svc_rdma_kfreeres(SVCXPRT *); 118 static void svc_rdma_kclone_destroy(SVCXPRT *); 119 static void svc_rdma_kstart(SVCMASTERXPRT *); 120 void svc_rdma_kstop(SVCMASTERXPRT *); 121 static void svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *); 122 static void svc_rdma_ktattrs(SVCXPRT *, int, void **); 123 124 static int svc_process_long_reply(SVCXPRT *, xdrproc_t, 125 caddr_t, struct rpc_msg *, bool_t, int *, 126 int *, int *, unsigned int *); 127 128 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t, 129 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *, 130 bool_t, uint_t *); 131 static bool_t rpcmsg_length(xdrproc_t, 132 caddr_t, 133 struct rpc_msg *, bool_t, int); 134 135 /* 136 * Server transport operations vector. 137 */ 138 struct svc_ops rdma_svc_ops = { 139 svc_rdma_krecv, /* Get requests */ 140 svc_rdma_kgetargs, /* Deserialize arguments */ 141 svc_rdma_ksend, /* Send reply */ 142 svc_rdma_kfreeargs, /* Free argument data space */ 143 svc_rdma_kdestroy, /* Destroy transport handle */ 144 svc_rdma_kdup, /* Check entry in dup req cache */ 145 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */ 146 svc_rdma_kgetres, /* Get pointer to response buffer */ 147 svc_rdma_kfreeres, /* Destroy pre-serialized response header */ 148 svc_rdma_kclone_destroy, /* Destroy a clone xprt */ 149 svc_rdma_kstart, /* Tell `ready-to-receive' to rpcmod */ 150 svc_rdma_kclone_xprt, /* Transport specific clone xprt */ 151 svc_rdma_ktattrs /* Get Transport Attributes */ 152 }; 153 154 /* 155 * Server statistics 156 * NOTE: This structure type is duplicated in the NFS fast path. 157 */ 158 struct { 159 kstat_named_t rscalls; 160 kstat_named_t rsbadcalls; 161 kstat_named_t rsnullrecv; 162 kstat_named_t rsbadlen; 163 kstat_named_t rsxdrcall; 164 kstat_named_t rsdupchecks; 165 kstat_named_t rsdupreqs; 166 kstat_named_t rslongrpcs; 167 kstat_named_t rstotalreplies; 168 kstat_named_t rstotallongreplies; 169 kstat_named_t rstotalinlinereplies; 170 } rdmarsstat = { 171 { "calls", KSTAT_DATA_UINT64 }, 172 { "badcalls", KSTAT_DATA_UINT64 }, 173 { "nullrecv", KSTAT_DATA_UINT64 }, 174 { "badlen", KSTAT_DATA_UINT64 }, 175 { "xdrcall", KSTAT_DATA_UINT64 }, 176 { "dupchecks", KSTAT_DATA_UINT64 }, 177 { "dupreqs", KSTAT_DATA_UINT64 }, 178 { "longrpcs", KSTAT_DATA_UINT64 }, 179 { "totalreplies", KSTAT_DATA_UINT64 }, 180 { "totallongreplies", KSTAT_DATA_UINT64 }, 181 { "totalinlinereplies", KSTAT_DATA_UINT64 }, 182 }; 183 184 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat; 185 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t); 186 187 #define RSSTAT_INCR(x) atomic_inc_64(&rdmarsstat.x.value.ui64) 188 /* 189 * Create a transport record. 190 * The transport record, output buffer, and private data structure 191 * are allocated. The output buffer is serialized into using xdrmem. 192 * There is one transport record per user process which implements a 193 * set of services. 194 */ 195 /* ARGSUSED */ 196 int 197 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, 198 rdma_xprt_group_t *started_xprts) 199 { 200 int error; 201 SVCMASTERXPRT *xprt; 202 struct rdma_data *rd; 203 rdma_registry_t *rmod; 204 rdma_xprt_record_t *xprt_rec; 205 queue_t *q; 206 /* 207 * modload the RDMA plugins is not already done. 208 */ 209 if (!rdma_modloaded) { 210 /*CONSTANTCONDITION*/ 211 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN); 212 213 mutex_enter(&rdma_modload_lock); 214 if (!rdma_modloaded) { 215 error = rdma_modload(); 216 } 217 mutex_exit(&rdma_modload_lock); 218 219 if (error) 220 return (error); 221 } 222 223 /* 224 * master_xprt_count is the count of master transport handles 225 * that were successfully created and are ready to recieve for 226 * RDMA based access. 227 */ 228 error = 0; 229 xprt_rec = NULL; 230 rw_enter(&rdma_lock, RW_READER); 231 if (rdma_mod_head == NULL) { 232 started_xprts->rtg_count = 0; 233 rw_exit(&rdma_lock); 234 if (rdma_dev_available) 235 return (EPROTONOSUPPORT); 236 else 237 return (ENODEV); 238 } 239 240 /* 241 * If we have reached here, then atleast one RDMA plugin has loaded. 242 * Create a master_xprt, make it start listenining on the device, 243 * if an error is generated, record it, we might need to shut 244 * the master_xprt. 245 * SVC_START() calls svc_rdma_kstart which calls plugin binding 246 * routines. 247 */ 248 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) { 249 250 /* 251 * One SVCMASTERXPRT per RDMA plugin. 252 */ 253 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP); 254 xprt->xp_ops = &rdma_svc_ops; 255 xprt->xp_sct = sct; 256 xprt->xp_type = T_RDMA; 257 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); 258 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); 259 xprt->xp_req_head = (mblk_t *)0; 260 xprt->xp_req_tail = (mblk_t *)0; 261 xprt->xp_full = FALSE; 262 xprt->xp_enable = FALSE; 263 xprt->xp_reqs = 0; 264 xprt->xp_size = 0; 265 xprt->xp_threads = 0; 266 xprt->xp_detached_threads = 0; 267 268 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP); 269 xprt->xp_p2 = (caddr_t)rd; 270 rd->rd_xprt = xprt; 271 rd->r_mod = rmod->r_mod; 272 273 q = &rd->rd_data.q; 274 xprt->xp_wq = q; 275 q->q_ptr = &rd->rd_xprt; 276 xprt->xp_netid = NULL; 277 278 /* 279 * Each of the plugins will have their own Service ID 280 * to listener specific mapping, like port number for VI 281 * and service name for IB. 282 */ 283 rd->rd_data.svcid = id; 284 error = svc_xprt_register(xprt, id); 285 if (error) { 286 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg); 287 goto cleanup; 288 } 289 290 SVC_START(xprt); 291 if (!rd->rd_data.active) { 292 svc_xprt_unregister(xprt); 293 error = rd->rd_data.err_code; 294 goto cleanup; 295 } 296 297 /* 298 * This is set only when there is atleast one or more 299 * transports successfully created. We insert the pointer 300 * to the created RDMA master xprt into a separately maintained 301 * list. This way we can easily reference it later to cleanup, 302 * when NFS kRPC service pool is going away/unregistered. 303 */ 304 started_xprts->rtg_count ++; 305 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP); 306 xprt_rec->rtr_xprt_ptr = xprt; 307 xprt_rec->rtr_next = started_xprts->rtg_listhead; 308 started_xprts->rtg_listhead = xprt_rec; 309 continue; 310 cleanup: 311 SVC_DESTROY(xprt); 312 if (error == RDMA_FAILED) 313 error = EPROTONOSUPPORT; 314 } 315 316 rw_exit(&rdma_lock); 317 318 /* 319 * Don't return any error even if a single plugin was started 320 * successfully. 321 */ 322 if (started_xprts->rtg_count == 0) 323 return (error); 324 return (0); 325 } 326 327 /* 328 * Cleanup routine for freeing up memory allocated by 329 * svc_rdma_kcreate() 330 */ 331 void 332 svc_rdma_kdestroy(SVCMASTERXPRT *xprt) 333 { 334 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2; 335 336 337 mutex_destroy(&xprt->xp_req_lock); 338 mutex_destroy(&xprt->xp_thread_lock); 339 kmem_free(rd, sizeof (*rd)); 340 kmem_free(xprt, sizeof (*xprt)); 341 } 342 343 344 static void 345 svc_rdma_kstart(SVCMASTERXPRT *xprt) 346 { 347 struct rdma_svc_data *svcdata; 348 rdma_mod_t *rmod; 349 350 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 351 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 352 353 /* 354 * Create a listener for module at this port 355 */ 356 357 if (rmod->rdma_count != 0) 358 (*rmod->rdma_ops->rdma_svc_listen)(svcdata); 359 else 360 svcdata->err_code = RDMA_FAILED; 361 } 362 363 void 364 svc_rdma_kstop(SVCMASTERXPRT *xprt) 365 { 366 struct rdma_svc_data *svcdata; 367 rdma_mod_t *rmod; 368 369 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 370 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 371 372 /* 373 * Call the stop listener routine for each plugin. If rdma_count is 374 * already zero set active to zero. 375 */ 376 if (rmod->rdma_count != 0) 377 (*rmod->rdma_ops->rdma_svc_stop)(svcdata); 378 else 379 svcdata->active = 0; 380 if (svcdata->active) 381 DTRACE_PROBE(krpc__e__svcrdma__kstop); 382 } 383 384 /* ARGSUSED */ 385 static void 386 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt) 387 { 388 389 struct clone_rdma_data *cdrp; 390 cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 391 392 /* 393 * Only free buffers and release connection when cloned is set. 394 */ 395 if (cdrp->cloned != TRUE) 396 return; 397 398 rdma_buf_free(cdrp->conn, &cdrp->rpcbuf); 399 if (cdrp->cl_reply) { 400 clist_free(cdrp->cl_reply); 401 cdrp->cl_reply = NULL; 402 } 403 RDMA_REL_CONN(cdrp->conn); 404 405 cdrp->cloned = 0; 406 } 407 408 /* 409 * Clone the xprt specific information. It will be freed by 410 * SVC_CLONE_DESTROY. 411 */ 412 static void 413 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt) 414 { 415 struct clone_rdma_data *srcp2; 416 struct clone_rdma_data *dstp2; 417 418 srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf; 419 dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf; 420 421 if (srcp2->conn != NULL) { 422 srcp2->cloned = TRUE; 423 *dstp2 = *srcp2; 424 } 425 } 426 427 static void 428 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr) 429 { 430 CONN *conn; 431 *tattr = NULL; 432 433 switch (attrflag) { 434 case SVC_TATTR_ADDRMASK: 435 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn; 436 ASSERT(conn != NULL); 437 if (conn) 438 *tattr = (void *)&conn->c_addrmask; 439 } 440 } 441 442 static bool_t 443 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg) 444 { 445 XDR *xdrs; 446 CONN *conn; 447 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; 448 struct clone_rdma_data *crdp; 449 struct clist *cl = NULL; 450 struct clist *wcl = NULL; 451 struct clist *cllong = NULL; 452 453 rdma_stat status; 454 uint32_t vers, op, pos, xid; 455 uint32_t rdma_credit; 456 uint32_t wcl_total_length = 0; 457 bool_t wwl = FALSE; 458 459 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 460 RSSTAT_INCR(rscalls); 461 conn = rdp->conn; 462 463 status = rdma_svc_postrecv(conn); 464 if (status != RDMA_SUCCESS) { 465 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv); 466 goto badrpc_call; 467 } 468 469 xdrs = &clone_xprt->xp_xdrin; 470 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE); 471 xid = *(uint32_t *)rdp->rpcmsg.addr; 472 XDR_SETPOS(xdrs, sizeof (uint32_t)); 473 474 if (! xdr_u_int(xdrs, &vers) || 475 ! xdr_u_int(xdrs, &rdma_credit) || 476 ! xdr_u_int(xdrs, &op)) { 477 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint); 478 goto xdr_err; 479 } 480 481 /* Checking if the status of the recv operation was normal */ 482 if (rdp->status != 0) { 483 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status, 484 int, rdp->status); 485 goto badrpc_call; 486 } 487 488 if (! xdr_do_clist(xdrs, &cl)) { 489 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist); 490 goto xdr_err; 491 } 492 493 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) { 494 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist); 495 if (cl) 496 clist_free(cl); 497 goto xdr_err; 498 } 499 crdp->cl_wlist = wcl; 500 501 crdp->cl_reply = NULL; 502 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply); 503 504 /* 505 * A chunk at 0 offset indicates that the RPC call message 506 * is in a chunk. Get the RPC call message chunk. 507 */ 508 if (cl != NULL && op == RDMA_NOMSG) { 509 510 /* Remove RPC call message chunk from chunklist */ 511 cllong = cl; 512 cl = cl->c_next; 513 cllong->c_next = NULL; 514 515 516 /* Allocate and register memory for the RPC call msg chunk */ 517 cllong->rb_longbuf.type = RDMA_LONG_BUFFER; 518 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ? 519 cllong->c_len : LONG_REPLY_LEN; 520 521 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) { 522 clist_free(cllong); 523 goto cll_malloc_err; 524 } 525 526 cllong->u.c_daddr3 = cllong->rb_longbuf.addr; 527 528 if (cllong->u.c_daddr == NULL) { 529 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem); 530 rdma_buf_free(conn, &cllong->rb_longbuf); 531 clist_free(cllong); 532 goto cll_malloc_err; 533 } 534 535 status = clist_register(conn, cllong, CLIST_REG_DST); 536 if (status) { 537 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg); 538 rdma_buf_free(conn, &cllong->rb_longbuf); 539 clist_free(cllong); 540 goto cll_malloc_err; 541 } 542 543 /* 544 * Now read the RPC call message in 545 */ 546 status = RDMA_READ(conn, cllong, WAIT); 547 if (status) { 548 DTRACE_PROBE(krpc__e__svcrdma__krecv__read); 549 (void) clist_deregister(conn, cllong); 550 rdma_buf_free(conn, &cllong->rb_longbuf); 551 clist_free(cllong); 552 goto cll_malloc_err; 553 } 554 555 status = clist_syncmem(conn, cllong, CLIST_REG_DST); 556 (void) clist_deregister(conn, cllong); 557 558 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3, 559 cllong->c_len, 0, cl, XDR_DECODE, conn); 560 561 crdp->rpcbuf = cllong->rb_longbuf; 562 crdp->rpcbuf.len = cllong->c_len; 563 clist_free(cllong); 564 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 565 } else { 566 pos = XDR_GETPOS(xdrs); 567 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos, 568 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn); 569 crdp->rpcbuf = rdp->rpcmsg; 570 571 /* Use xdrrdmablk_ops to indicate there is a read chunk list */ 572 if (cl != NULL) { 573 int32_t flg = XDR_RDMA_RLIST_REG; 574 575 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 576 xdrs->x_ops = &xdrrdmablk_ops; 577 } 578 } 579 580 if (crdp->cl_wlist) { 581 int32_t flg = XDR_RDMA_WLIST_REG; 582 583 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist); 584 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 585 } 586 587 if (! xdr_callmsg(xdrs, msg)) { 588 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg); 589 RSSTAT_INCR(rsxdrcall); 590 goto callmsg_err; 591 } 592 593 /* 594 * Point the remote transport address in the service_transport 595 * handle at the address in the request. 596 */ 597 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf; 598 clone_xprt->xp_rtaddr.len = conn->c_raddr.len; 599 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len; 600 601 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf; 602 clone_xprt->xp_lcladdr.len = conn->c_laddr.len; 603 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len; 604 605 /* 606 * In case of RDMA, connection management is 607 * entirely done in rpcib module and netid in the 608 * SVCMASTERXPRT is NULL. Initialize the clone netid 609 * from the connection. 610 */ 611 612 clone_xprt->xp_netid = conn->c_netid; 613 614 clone_xprt->xp_xid = xid; 615 crdp->conn = conn; 616 617 freeb(mp); 618 619 return (TRUE); 620 621 callmsg_err: 622 rdma_buf_free(conn, &crdp->rpcbuf); 623 624 cll_malloc_err: 625 if (cl) 626 clist_free(cl); 627 xdr_err: 628 XDR_DESTROY(xdrs); 629 630 badrpc_call: 631 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 632 RDMA_REL_CONN(conn); 633 freeb(mp); 634 RSSTAT_INCR(rsbadcalls); 635 return (FALSE); 636 } 637 638 static int 639 svc_process_long_reply(SVCXPRT * clone_xprt, 640 xdrproc_t xdr_results, caddr_t xdr_location, 641 struct rpc_msg *msg, bool_t has_args, int *msglen, 642 int *freelen, int *numchunks, unsigned int *final_len) 643 { 644 int status; 645 XDR xdrslong; 646 struct clist *wcl = NULL; 647 int count = 0; 648 int alloc_len; 649 char *memp; 650 rdma_buf_t long_rpc = {0}; 651 struct clone_rdma_data *crdp; 652 653 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 654 655 bzero(&xdrslong, sizeof (xdrslong)); 656 657 /* Choose a size for the long rpc response */ 658 if (MSG_IS_RPCSEC_GSS(msg)) { 659 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen); 660 } else { 661 alloc_len = RNDUP(*msglen); 662 } 663 664 if (alloc_len <= 64 * 1024) { 665 if (alloc_len > 32 * 1024) { 666 alloc_len = 64 * 1024; 667 } else { 668 if (alloc_len > 16 * 1024) { 669 alloc_len = 32 * 1024; 670 } else { 671 alloc_len = 16 * 1024; 672 } 673 } 674 } 675 676 long_rpc.type = RDMA_LONG_BUFFER; 677 long_rpc.len = alloc_len; 678 if (rdma_buf_alloc(crdp->conn, &long_rpc)) { 679 return (SVC_RDMA_FAIL); 680 } 681 682 memp = long_rpc.addr; 683 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE); 684 685 msg->rm_xid = clone_xprt->xp_xid; 686 687 if (!(xdr_replymsg(&xdrslong, msg) && 688 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong, 689 xdr_results, xdr_location)))) { 690 rdma_buf_free(crdp->conn, &long_rpc); 691 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap); 692 return (SVC_RDMA_FAIL); 693 } 694 695 *final_len = XDR_GETPOS(&xdrslong); 696 697 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len); 698 *numchunks = 0; 699 *freelen = 0; 700 701 wcl = crdp->cl_reply; 702 wcl->rb_longbuf = long_rpc; 703 704 count = *final_len; 705 while ((wcl != NULL) && (count > 0)) { 706 707 if (wcl->c_dmemhandle.mrc_rmr == 0) 708 break; 709 710 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count, 711 uint32_t, wcl->c_len); 712 713 if (wcl->c_len > count) { 714 wcl->c_len = count; 715 } 716 wcl->w.c_saddr3 = (caddr_t)memp; 717 718 count -= wcl->c_len; 719 *numchunks += 1; 720 memp += wcl->c_len; 721 wcl = wcl->c_next; 722 } 723 724 /* 725 * Make rest of the chunks 0-len 726 */ 727 while (wcl != NULL) { 728 if (wcl->c_dmemhandle.mrc_rmr == 0) 729 break; 730 wcl->c_len = 0; 731 wcl = wcl->c_next; 732 } 733 734 wcl = crdp->cl_reply; 735 736 /* 737 * MUST fail if there are still more data 738 */ 739 if (count > 0) { 740 rdma_buf_free(crdp->conn, &long_rpc); 741 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist); 742 return (SVC_RDMA_FAIL); 743 } 744 745 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) { 746 rdma_buf_free(crdp->conn, &long_rpc); 747 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg); 748 return (SVC_RDMA_FAIL); 749 } 750 751 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE); 752 753 if (status) { 754 (void) clist_deregister(crdp->conn, wcl); 755 rdma_buf_free(crdp->conn, &long_rpc); 756 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem); 757 return (SVC_RDMA_FAIL); 758 } 759 760 status = RDMA_WRITE(crdp->conn, wcl, WAIT); 761 762 (void) clist_deregister(crdp->conn, wcl); 763 rdma_buf_free(crdp->conn, &wcl->rb_longbuf); 764 765 if (status != RDMA_SUCCESS) { 766 DTRACE_PROBE(krpc__e__svcrdma__longrep__write); 767 return (SVC_RDMA_FAIL); 768 } 769 770 return (SVC_RDMA_SUCCESS); 771 } 772 773 774 static int 775 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results, 776 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs, 777 struct rpc_msg *msg, bool_t has_args, uint_t *len) 778 { 779 /* 780 * Get a pre-allocated buffer for rpc reply 781 */ 782 rpcreply->type = SEND_BUFFER; 783 if (rdma_buf_alloc(conn, rpcreply)) { 784 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs); 785 return (SVC_RDMA_FAIL); 786 } 787 788 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len, 789 0, NULL, XDR_ENCODE, conn); 790 791 msg->rm_xid = clone_xprt->xp_xid; 792 793 if (has_args) { 794 if (!(xdr_replymsg(*xdrs, msg) && 795 (!has_args || 796 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs, 797 xdr_results, xdr_location)))) { 798 rdma_buf_free(conn, rpcreply); 799 DTRACE_PROBE( 800 krpc__e__svcrdma__rpcmsg__reply__authwrap1); 801 return (SVC_RDMA_FAIL); 802 } 803 } else { 804 if (!xdr_replymsg(*xdrs, msg)) { 805 rdma_buf_free(conn, rpcreply); 806 DTRACE_PROBE( 807 krpc__e__svcrdma__rpcmsg__reply__authwrap2); 808 return (SVC_RDMA_FAIL); 809 } 810 } 811 812 *len = XDR_GETPOS(*xdrs); 813 814 return (SVC_RDMA_SUCCESS); 815 } 816 817 /* 818 * Send rpc reply. 819 */ 820 static bool_t 821 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg) 822 { 823 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout); 824 XDR xdrs_rhdr; 825 CONN *conn = NULL; 826 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0}; 827 828 struct clone_rdma_data *crdp; 829 struct clist *cl_read = NULL; 830 struct clist *cl_send = NULL; 831 struct clist *cl_write = NULL; 832 xdrproc_t xdr_results; /* results XDR encoding function */ 833 caddr_t xdr_location; /* response results pointer */ 834 835 int retval = FALSE; 836 int status, msglen, num_wreply_segments = 0; 837 uint32_t rdma_credit = 0; 838 int freelen = 0; 839 bool_t has_args; 840 uint_t final_resp_len, rdma_response_op, vers; 841 842 bzero(&xdrs_rhdr, sizeof (XDR)); 843 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 844 conn = crdp->conn; 845 846 /* 847 * If there is a result procedure specified in the reply message, 848 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP. 849 * We need to make sure it won't be processed twice, so we null 850 * it for xdr_replymsg here. 851 */ 852 has_args = FALSE; 853 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 854 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 855 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) { 856 has_args = TRUE; 857 xdr_location = msg->acpted_rply.ar_results.where; 858 msg->acpted_rply.ar_results.proc = xdr_void; 859 msg->acpted_rply.ar_results.where = NULL; 860 } 861 } 862 863 /* 864 * Given the limit on the inline response size (RPC_MSG_SZ), 865 * there is a need to make a guess as to the overall size of 866 * the response. If the resultant size is beyond the inline 867 * size, then the server needs to use the "reply chunk list" 868 * provided by the client (if the client provided one). An 869 * example of this type of response would be a READDIR 870 * response (e.g. a small directory read would fit in RPC_MSG_SZ 871 * and that is the preference but it may not fit) 872 * 873 * Combine the encoded size and the size of the true results 874 * and then make the decision about where to encode and send results. 875 * 876 * One important note, this calculation is ignoring the size 877 * of the encoding of the authentication overhead. The reason 878 * for this is rooted in the complexities of access to the 879 * encoded size of RPCSEC_GSS related authentiation, 880 * integrity, and privacy. 881 * 882 * If it turns out that the encoded authentication bumps the 883 * response over the RPC_MSG_SZ limit, then it may need to 884 * attempt to encode for the reply chunk list. 885 */ 886 887 /* 888 * Calculating the "sizeof" the RPC response header and the 889 * encoded results. 890 */ 891 msglen = xdr_sizeof(xdr_replymsg, msg); 892 893 if (msglen > 0) { 894 RSSTAT_INCR(rstotalreplies); 895 } 896 if (has_args) 897 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 898 rdma_minchunk, NULL, NULL); 899 900 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen); 901 902 status = SVC_RDMA_SUCCESS; 903 904 if (msglen < RPC_MSG_SZ) { 905 /* 906 * Looks like the response will fit in the inline 907 * response; let's try 908 */ 909 RSSTAT_INCR(rstotalinlinereplies); 910 911 rdma_response_op = RDMA_MSG; 912 913 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results, 914 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg, 915 has_args, &final_resp_len); 916 917 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status, 918 int, status); 919 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len, 920 int, final_resp_len); 921 922 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) { 923 clist_free(crdp->cl_reply); 924 crdp->cl_reply = NULL; 925 } 926 } 927 928 /* 929 * If the encode failed (size?) or the message really is 930 * larger than what is allowed, try the response chunk list. 931 */ 932 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) { 933 /* 934 * attempting to use a reply chunk list when there 935 * isn't one won't get very far... 936 */ 937 if (crdp->cl_reply == NULL) { 938 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl); 939 goto out; 940 } 941 942 RSSTAT_INCR(rstotallongreplies); 943 944 msglen = xdr_sizeof(xdr_replymsg, msg); 945 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0, 946 NULL, NULL); 947 948 status = svc_process_long_reply(clone_xprt, xdr_results, 949 xdr_location, msg, has_args, &msglen, &freelen, 950 &num_wreply_segments, &final_resp_len); 951 952 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen, 953 int, final_resp_len); 954 955 if (status != SVC_RDMA_SUCCESS) { 956 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed); 957 goto out; 958 } 959 960 rdma_response_op = RDMA_NOMSG; 961 } 962 963 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len, 964 int, final_resp_len); 965 966 rbuf_resp.type = SEND_BUFFER; 967 if (rdma_buf_alloc(conn, &rbuf_resp)) { 968 rdma_buf_free(conn, &rbuf_rpc_resp); 969 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs); 970 goto out; 971 } 972 973 rdma_credit = rdma_bufs_granted; 974 975 vers = RPCRDMA_VERS; 976 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE); 977 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid; 978 /* Skip xid and set the xdr position accordingly. */ 979 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t)); 980 if (!xdr_u_int(&xdrs_rhdr, &vers) || 981 !xdr_u_int(&xdrs_rhdr, &rdma_credit) || 982 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) { 983 rdma_buf_free(conn, &rbuf_rpc_resp); 984 rdma_buf_free(conn, &rbuf_resp); 985 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint); 986 goto out; 987 } 988 989 /* 990 * Now XDR the read chunk list, actually always NULL 991 */ 992 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read); 993 994 /* 995 * encode write list -- we already drove RDMA_WRITEs 996 */ 997 cl_write = crdp->cl_wlist; 998 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) { 999 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist); 1000 rdma_buf_free(conn, &rbuf_rpc_resp); 1001 rdma_buf_free(conn, &rbuf_resp); 1002 goto out; 1003 } 1004 1005 /* 1006 * XDR encode the RDMA_REPLY write chunk 1007 */ 1008 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply, 1009 num_wreply_segments)) { 1010 rdma_buf_free(conn, &rbuf_rpc_resp); 1011 rdma_buf_free(conn, &rbuf_resp); 1012 goto out; 1013 } 1014 1015 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle, 1016 rbuf_resp.addr, NULL, NULL); 1017 1018 if (rdma_response_op == RDMA_MSG) { 1019 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle, 1020 rbuf_rpc_resp.addr, NULL, NULL); 1021 } 1022 1023 status = RDMA_SEND(conn, cl_send, msg->rm_xid); 1024 1025 if (status == RDMA_SUCCESS) { 1026 retval = TRUE; 1027 } 1028 1029 out: 1030 /* 1031 * Free up sendlist chunks 1032 */ 1033 if (cl_send != NULL) 1034 clist_free(cl_send); 1035 1036 /* 1037 * Destroy private data for xdr rdma 1038 */ 1039 if (clone_xprt->xp_xdrout.x_ops != NULL) { 1040 XDR_DESTROY(&(clone_xprt->xp_xdrout)); 1041 } 1042 1043 if (crdp->cl_reply) { 1044 clist_free(crdp->cl_reply); 1045 crdp->cl_reply = NULL; 1046 } 1047 1048 /* 1049 * This is completely disgusting. If public is set it is 1050 * a pointer to a structure whose first field is the address 1051 * of the function to free that structure and any related 1052 * stuff. (see rrokfree in nfs_xdr.c). 1053 */ 1054 if (xdrs_rpc->x_public) { 1055 /* LINTED pointer alignment */ 1056 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public); 1057 } 1058 1059 if (xdrs_rhdr.x_ops != NULL) { 1060 XDR_DESTROY(&xdrs_rhdr); 1061 } 1062 1063 return (retval); 1064 } 1065 1066 /* 1067 * Deserialize arguments. 1068 */ 1069 static bool_t 1070 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr) 1071 { 1072 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin, 1073 xdr_args, args_ptr)) != TRUE) 1074 return (FALSE); 1075 return (TRUE); 1076 } 1077 1078 static bool_t 1079 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, 1080 caddr_t args_ptr) 1081 { 1082 struct clone_rdma_data *crdp; 1083 bool_t retval; 1084 1085 /* 1086 * If the cloned bit is true, then this transport specific 1087 * rmda data has been duplicated into another cloned xprt. Do 1088 * not free, or release the connection, it is still in use. The 1089 * buffers will be freed and the connection released later by 1090 * SVC_CLONE_DESTROY(). 1091 */ 1092 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 1093 if (crdp->cloned == TRUE) { 1094 crdp->cloned = 0; 1095 return (TRUE); 1096 } 1097 1098 /* 1099 * Free the args if needed then XDR_DESTROY 1100 */ 1101 if (args_ptr) { 1102 XDR *xdrs = &clone_xprt->xp_xdrin; 1103 1104 xdrs->x_op = XDR_FREE; 1105 retval = (*xdr_args)(xdrs, args_ptr); 1106 } 1107 1108 XDR_DESTROY(&(clone_xprt->xp_xdrin)); 1109 rdma_buf_free(crdp->conn, &crdp->rpcbuf); 1110 if (crdp->cl_reply) { 1111 clist_free(crdp->cl_reply); 1112 crdp->cl_reply = NULL; 1113 } 1114 RDMA_REL_CONN(crdp->conn); 1115 1116 return (retval); 1117 } 1118 1119 /* ARGSUSED */ 1120 static int32_t * 1121 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size) 1122 { 1123 return (NULL); 1124 } 1125 1126 /* ARGSUSED */ 1127 static void 1128 svc_rdma_kfreeres(SVCXPRT *clone_xprt) 1129 { 1130 } 1131 1132 /* 1133 * the dup cacheing routines below provide a cache of non-failure 1134 * transaction id's. rpc service routines can use this to detect 1135 * retransmissions and re-send a non-failure response. 1136 */ 1137 1138 /* 1139 * MAXDUPREQS is the number of cached items. It should be adjusted 1140 * to the service load so that there is likely to be a response entry 1141 * when the first retransmission comes in. 1142 */ 1143 #define MAXDUPREQS 8192 1144 1145 /* 1146 * This should be appropriately scaled to MAXDUPREQS. To produce as less as 1147 * possible collisions it is suggested to set this to a prime. 1148 */ 1149 #define DRHASHSZ 2053 1150 1151 #define XIDHASH(xid) ((xid) % DRHASHSZ) 1152 #define DRHASH(dr) XIDHASH((dr)->dr_xid) 1153 #define REQTOXID(req) ((req)->rq_xprt->xp_xid) 1154 1155 static int rdmandupreqs = 0; 1156 int rdmamaxdupreqs = MAXDUPREQS; 1157 static kmutex_t rdmadupreq_lock; 1158 static struct dupreq *rdmadrhashtbl[DRHASHSZ]; 1159 static int rdmadrhashstat[DRHASHSZ]; 1160 1161 static void unhash(struct dupreq *); 1162 1163 /* 1164 * rdmadrmru points to the head of a circular linked list in lru order. 1165 * rdmadrmru->dr_next == drlru 1166 */ 1167 struct dupreq *rdmadrmru; 1168 1169 /* 1170 * svc_rdma_kdup searches the request cache and returns 0 if the 1171 * request is not found in the cache. If it is found, then it 1172 * returns the state of the request (in progress or done) and 1173 * the status or attributes that were part of the original reply. 1174 */ 1175 static int 1176 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp, 1177 bool_t *dupcachedp) 1178 { 1179 struct dupreq *dr; 1180 uint32_t xid; 1181 uint32_t drhash; 1182 int status; 1183 1184 xid = REQTOXID(req); 1185 mutex_enter(&rdmadupreq_lock); 1186 RSSTAT_INCR(rsdupchecks); 1187 /* 1188 * Check to see whether an entry already exists in the cache. 1189 */ 1190 dr = rdmadrhashtbl[XIDHASH(xid)]; 1191 while (dr != NULL) { 1192 if (dr->dr_xid == xid && 1193 dr->dr_proc == req->rq_proc && 1194 dr->dr_prog == req->rq_prog && 1195 dr->dr_vers == req->rq_vers && 1196 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len && 1197 bcmp((caddr_t)dr->dr_addr.buf, 1198 (caddr_t)req->rq_xprt->xp_rtaddr.buf, 1199 dr->dr_addr.len) == 0) { 1200 status = dr->dr_status; 1201 if (status == DUP_DONE) { 1202 bcopy(dr->dr_resp.buf, res, size); 1203 if (dupcachedp != NULL) 1204 *dupcachedp = (dr->dr_resfree != NULL); 1205 } else { 1206 dr->dr_status = DUP_INPROGRESS; 1207 *drpp = dr; 1208 } 1209 RSSTAT_INCR(rsdupreqs); 1210 mutex_exit(&rdmadupreq_lock); 1211 return (status); 1212 } 1213 dr = dr->dr_chain; 1214 } 1215 1216 /* 1217 * There wasn't an entry, either allocate a new one or recycle 1218 * an old one. 1219 */ 1220 if (rdmandupreqs < rdmamaxdupreqs) { 1221 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP); 1222 if (dr == NULL) { 1223 mutex_exit(&rdmadupreq_lock); 1224 return (DUP_ERROR); 1225 } 1226 dr->dr_resp.buf = NULL; 1227 dr->dr_resp.maxlen = 0; 1228 dr->dr_addr.buf = NULL; 1229 dr->dr_addr.maxlen = 0; 1230 if (rdmadrmru) { 1231 dr->dr_next = rdmadrmru->dr_next; 1232 rdmadrmru->dr_next = dr; 1233 } else { 1234 dr->dr_next = dr; 1235 } 1236 rdmandupreqs++; 1237 } else { 1238 dr = rdmadrmru->dr_next; 1239 while (dr->dr_status == DUP_INPROGRESS) { 1240 dr = dr->dr_next; 1241 if (dr == rdmadrmru->dr_next) { 1242 mutex_exit(&rdmadupreq_lock); 1243 return (DUP_ERROR); 1244 } 1245 } 1246 unhash(dr); 1247 if (dr->dr_resfree) { 1248 (*dr->dr_resfree)(dr->dr_resp.buf); 1249 } 1250 } 1251 dr->dr_resfree = NULL; 1252 rdmadrmru = dr; 1253 1254 dr->dr_xid = REQTOXID(req); 1255 dr->dr_prog = req->rq_prog; 1256 dr->dr_vers = req->rq_vers; 1257 dr->dr_proc = req->rq_proc; 1258 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { 1259 if (dr->dr_addr.buf != NULL) 1260 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); 1261 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; 1262 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); 1263 if (dr->dr_addr.buf == NULL) { 1264 dr->dr_addr.maxlen = 0; 1265 dr->dr_status = DUP_DROP; 1266 mutex_exit(&rdmadupreq_lock); 1267 return (DUP_ERROR); 1268 } 1269 } 1270 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; 1271 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); 1272 if (dr->dr_resp.maxlen < size) { 1273 if (dr->dr_resp.buf != NULL) 1274 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); 1275 dr->dr_resp.maxlen = (unsigned int)size; 1276 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP); 1277 if (dr->dr_resp.buf == NULL) { 1278 dr->dr_resp.maxlen = 0; 1279 dr->dr_status = DUP_DROP; 1280 mutex_exit(&rdmadupreq_lock); 1281 return (DUP_ERROR); 1282 } 1283 } 1284 dr->dr_status = DUP_INPROGRESS; 1285 1286 drhash = (uint32_t)DRHASH(dr); 1287 dr->dr_chain = rdmadrhashtbl[drhash]; 1288 rdmadrhashtbl[drhash] = dr; 1289 rdmadrhashstat[drhash]++; 1290 mutex_exit(&rdmadupreq_lock); 1291 *drpp = dr; 1292 return (DUP_NEW); 1293 } 1294 1295 /* 1296 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP) 1297 * and stores the response. 1298 */ 1299 static void 1300 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), 1301 int size, int status) 1302 { 1303 ASSERT(dr->dr_resfree == NULL); 1304 if (status == DUP_DONE) { 1305 bcopy(res, dr->dr_resp.buf, size); 1306 dr->dr_resfree = dis_resfree; 1307 } 1308 dr->dr_status = status; 1309 } 1310 1311 /* 1312 * This routine expects that the mutex, rdmadupreq_lock, is already held. 1313 */ 1314 static void 1315 unhash(struct dupreq *dr) 1316 { 1317 struct dupreq *drt; 1318 struct dupreq *drtprev = NULL; 1319 uint32_t drhash; 1320 1321 ASSERT(MUTEX_HELD(&rdmadupreq_lock)); 1322 1323 drhash = (uint32_t)DRHASH(dr); 1324 drt = rdmadrhashtbl[drhash]; 1325 while (drt != NULL) { 1326 if (drt == dr) { 1327 rdmadrhashstat[drhash]--; 1328 if (drtprev == NULL) { 1329 rdmadrhashtbl[drhash] = drt->dr_chain; 1330 } else { 1331 drtprev->dr_chain = drt->dr_chain; 1332 } 1333 return; 1334 } 1335 drtprev = drt; 1336 drt = drt->dr_chain; 1337 } 1338 } 1339 1340 bool_t 1341 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist) 1342 { 1343 struct clist *clist; 1344 uint32_t tlen; 1345 1346 if (req->rq_xprt->xp_type != T_RDMA) { 1347 return (FALSE); 1348 } 1349 1350 tlen = 0; 1351 clist = wlist; 1352 while (clist) { 1353 tlen += clist->c_len; 1354 clist = clist->c_next; 1355 } 1356 1357 /* 1358 * set iov to addr+len of first segment of first wchunk of 1359 * wlist sent by client. krecv() already malloc'd a buffer 1360 * large enough, but registration is deferred until we write 1361 * the buffer back to (NFS) client using RDMA_WRITE. 1362 */ 1363 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr; 1364 iov->iov_len = tlen; 1365 1366 return (TRUE); 1367 } 1368 1369 /* 1370 * routine to setup the read chunk lists 1371 */ 1372 1373 int 1374 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len) 1375 { 1376 int data_len, avail_len; 1377 uint_t round_len; 1378 1379 data_len = avail_len = 0; 1380 1381 while (wcl != NULL && count > 0) { 1382 if (wcl->c_dmemhandle.mrc_rmr == 0) 1383 break; 1384 1385 if (wcl->c_len < count) { 1386 data_len += wcl->c_len; 1387 avail_len = 0; 1388 } else { 1389 data_len += count; 1390 avail_len = wcl->c_len - count; 1391 wcl->c_len = count; 1392 } 1393 count -= wcl->c_len; 1394 1395 if (count == 0) 1396 break; 1397 1398 wcl = wcl->c_next; 1399 } 1400 1401 /* 1402 * MUST fail if there are still more data 1403 */ 1404 if (count > 0) { 1405 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len, 1406 int, data_len, int, count); 1407 return (FALSE); 1408 } 1409 1410 /* 1411 * Round up the last chunk to 4-byte boundary 1412 */ 1413 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT); 1414 round_len = *wcl_len - data_len; 1415 1416 if (round_len) { 1417 1418 /* 1419 * If there is space in the current chunk, 1420 * add the roundup to the chunk. 1421 */ 1422 if (avail_len >= round_len) { 1423 wcl->c_len += round_len; 1424 } else { 1425 /* 1426 * try the next one. 1427 */ 1428 wcl = wcl->c_next; 1429 if ((wcl == NULL) || (wcl->c_len < round_len)) { 1430 DTRACE_PROBE1( 1431 krpc__e__rdma_setup_read_chunks_rndup, 1432 int, round_len); 1433 return (FALSE); 1434 } 1435 wcl->c_len = round_len; 1436 } 1437 } 1438 1439 wcl = wcl->c_next; 1440 1441 /* 1442 * Make rest of the chunks 0-len 1443 */ 1444 1445 clist_zero_len(wcl); 1446 1447 return (TRUE); 1448 } 1449