1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 26 /* All Rights Reserved */ 27 /* 28 * Portions of this source code were derived from Berkeley 29 * 4.3 BSD under license from the Regents of the University of 30 * California. 31 */ 32 33 /* 34 * Server side of RPC over RDMA in the kernel. 35 */ 36 37 #include <sys/param.h> 38 #include <sys/types.h> 39 #include <sys/user.h> 40 #include <sys/sysmacros.h> 41 #include <sys/proc.h> 42 #include <sys/file.h> 43 #include <sys/errno.h> 44 #include <sys/kmem.h> 45 #include <sys/debug.h> 46 #include <sys/systm.h> 47 #include <sys/cmn_err.h> 48 #include <sys/kstat.h> 49 #include <sys/vtrace.h> 50 #include <sys/debug.h> 51 52 #include <rpc/types.h> 53 #include <rpc/xdr.h> 54 #include <rpc/auth.h> 55 #include <rpc/clnt.h> 56 #include <rpc/rpc_msg.h> 57 #include <rpc/svc.h> 58 #include <rpc/rpc_rdma.h> 59 #include <sys/ddi.h> 60 #include <sys/sunddi.h> 61 62 #include <inet/common.h> 63 #include <inet/ip.h> 64 #include <inet/ip6.h> 65 66 #include <nfs/nfs.h> 67 #include <sys/sdt.h> 68 69 #define SVC_RDMA_SUCCESS 0 70 #define SVC_RDMA_FAIL -1 71 72 #define SVC_CREDIT_FACTOR (0.5) 73 74 #define MSG_IS_RPCSEC_GSS(msg) \ 75 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) 76 77 78 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT; 79 80 /* 81 * RDMA transport specific data associated with SVCMASTERXPRT 82 */ 83 struct rdma_data { 84 SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */ 85 struct rdma_svc_data rd_data; /* rdma data */ 86 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */ 87 }; 88 89 /* 90 * Plugin connection specific data stashed away in clone SVCXPRT 91 */ 92 struct clone_rdma_data { 93 CONN *conn; /* RDMA connection */ 94 rdma_buf_t rpcbuf; /* RPC req/resp buffer */ 95 struct clist *cl_reply; /* reply chunk buffer info */ 96 struct clist *cl_wlist; /* write list clist */ 97 }; 98 99 #define MAXADDRLEN 128 /* max length for address mask */ 100 101 /* 102 * Routines exported through ops vector. 103 */ 104 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *); 105 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *); 106 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t); 107 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t); 108 void svc_rdma_kdestroy(SVCMASTERXPRT *); 109 static int svc_rdma_kdup(struct svc_req *, caddr_t, int, 110 struct dupreq **, bool_t *); 111 static void svc_rdma_kdupdone(struct dupreq *, caddr_t, 112 void (*)(), int, int); 113 static int32_t *svc_rdma_kgetres(SVCXPRT *, int); 114 static void svc_rdma_kfreeres(SVCXPRT *); 115 static void svc_rdma_kclone_destroy(SVCXPRT *); 116 static void svc_rdma_kstart(SVCMASTERXPRT *); 117 void svc_rdma_kstop(SVCMASTERXPRT *); 118 119 static int svc_process_long_reply(SVCXPRT *, xdrproc_t, 120 caddr_t, struct rpc_msg *, bool_t, int *, 121 int *, int *, unsigned int *); 122 123 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t, 124 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *, 125 bool_t, uint_t *); 126 static bool_t rpcmsg_length(xdrproc_t, 127 caddr_t, 128 struct rpc_msg *, bool_t, int); 129 130 /* 131 * Server transport operations vector. 132 */ 133 struct svc_ops rdma_svc_ops = { 134 svc_rdma_krecv, /* Get requests */ 135 svc_rdma_kgetargs, /* Deserialize arguments */ 136 svc_rdma_ksend, /* Send reply */ 137 svc_rdma_kfreeargs, /* Free argument data space */ 138 svc_rdma_kdestroy, /* Destroy transport handle */ 139 svc_rdma_kdup, /* Check entry in dup req cache */ 140 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */ 141 svc_rdma_kgetres, /* Get pointer to response buffer */ 142 svc_rdma_kfreeres, /* Destroy pre-serialized response header */ 143 svc_rdma_kclone_destroy, /* Destroy a clone xprt */ 144 svc_rdma_kstart /* Tell `ready-to-receive' to rpcmod */ 145 }; 146 147 /* 148 * Server statistics 149 * NOTE: This structure type is duplicated in the NFS fast path. 150 */ 151 struct { 152 kstat_named_t rscalls; 153 kstat_named_t rsbadcalls; 154 kstat_named_t rsnullrecv; 155 kstat_named_t rsbadlen; 156 kstat_named_t rsxdrcall; 157 kstat_named_t rsdupchecks; 158 kstat_named_t rsdupreqs; 159 kstat_named_t rslongrpcs; 160 kstat_named_t rstotalreplies; 161 kstat_named_t rstotallongreplies; 162 kstat_named_t rstotalinlinereplies; 163 } rdmarsstat = { 164 { "calls", KSTAT_DATA_UINT64 }, 165 { "badcalls", KSTAT_DATA_UINT64 }, 166 { "nullrecv", KSTAT_DATA_UINT64 }, 167 { "badlen", KSTAT_DATA_UINT64 }, 168 { "xdrcall", KSTAT_DATA_UINT64 }, 169 { "dupchecks", KSTAT_DATA_UINT64 }, 170 { "dupreqs", KSTAT_DATA_UINT64 }, 171 { "longrpcs", KSTAT_DATA_UINT64 }, 172 { "totalreplies", KSTAT_DATA_UINT64 }, 173 { "totallongreplies", KSTAT_DATA_UINT64 }, 174 { "totalinlinereplies", KSTAT_DATA_UINT64 }, 175 }; 176 177 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat; 178 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t); 179 180 #define RSSTAT_INCR(x) atomic_add_64(&rdmarsstat.x.value.ui64, 1) 181 /* 182 * Create a transport record. 183 * The transport record, output buffer, and private data structure 184 * are allocated. The output buffer is serialized into using xdrmem. 185 * There is one transport record per user process which implements a 186 * set of services. 187 */ 188 /* ARGSUSED */ 189 int 190 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, 191 rdma_xprt_group_t *started_xprts) 192 { 193 int error; 194 SVCMASTERXPRT *xprt; 195 struct rdma_data *rd; 196 rdma_registry_t *rmod; 197 rdma_xprt_record_t *xprt_rec; 198 queue_t *q; 199 /* 200 * modload the RDMA plugins is not already done. 201 */ 202 if (!rdma_modloaded) { 203 /*CONSTANTCONDITION*/ 204 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN); 205 206 mutex_enter(&rdma_modload_lock); 207 if (!rdma_modloaded) { 208 error = rdma_modload(); 209 } 210 mutex_exit(&rdma_modload_lock); 211 212 if (error) 213 return (error); 214 } 215 216 /* 217 * master_xprt_count is the count of master transport handles 218 * that were successfully created and are ready to recieve for 219 * RDMA based access. 220 */ 221 error = 0; 222 xprt_rec = NULL; 223 rw_enter(&rdma_lock, RW_READER); 224 if (rdma_mod_head == NULL) { 225 started_xprts->rtg_count = 0; 226 rw_exit(&rdma_lock); 227 if (rdma_dev_available) 228 return (EPROTONOSUPPORT); 229 else 230 return (ENODEV); 231 } 232 233 /* 234 * If we have reached here, then atleast one RDMA plugin has loaded. 235 * Create a master_xprt, make it start listenining on the device, 236 * if an error is generated, record it, we might need to shut 237 * the master_xprt. 238 * SVC_START() calls svc_rdma_kstart which calls plugin binding 239 * routines. 240 */ 241 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) { 242 243 /* 244 * One SVCMASTERXPRT per RDMA plugin. 245 */ 246 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP); 247 xprt->xp_ops = &rdma_svc_ops; 248 xprt->xp_sct = sct; 249 xprt->xp_type = T_RDMA; 250 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); 251 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); 252 xprt->xp_req_head = (mblk_t *)0; 253 xprt->xp_req_tail = (mblk_t *)0; 254 xprt->xp_threads = 0; 255 xprt->xp_detached_threads = 0; 256 257 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP); 258 xprt->xp_p2 = (caddr_t)rd; 259 rd->rd_xprt = xprt; 260 rd->r_mod = rmod->r_mod; 261 262 q = &rd->rd_data.q; 263 xprt->xp_wq = q; 264 q->q_ptr = &rd->rd_xprt; 265 xprt->xp_netid = NULL; 266 267 xprt->xp_addrmask.maxlen = 268 xprt->xp_addrmask.len = sizeof (struct sockaddr_in); 269 xprt->xp_addrmask.buf = 270 kmem_zalloc(xprt->xp_addrmask.len, KM_SLEEP); 271 ((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_addr.s_addr = 272 (uint32_t)~0; 273 ((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_family = 274 (ushort_t)~0; 275 276 /* 277 * Each of the plugins will have their own Service ID 278 * to listener specific mapping, like port number for VI 279 * and service name for IB. 280 */ 281 rd->rd_data.svcid = id; 282 error = svc_xprt_register(xprt, id); 283 if (error) { 284 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg); 285 goto cleanup; 286 } 287 288 SVC_START(xprt); 289 if (!rd->rd_data.active) { 290 svc_xprt_unregister(xprt); 291 error = rd->rd_data.err_code; 292 goto cleanup; 293 } 294 295 /* 296 * This is set only when there is atleast one or more 297 * transports successfully created. We insert the pointer 298 * to the created RDMA master xprt into a separately maintained 299 * list. This way we can easily reference it later to cleanup, 300 * when NFS kRPC service pool is going away/unregistered. 301 */ 302 started_xprts->rtg_count ++; 303 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP); 304 xprt_rec->rtr_xprt_ptr = xprt; 305 xprt_rec->rtr_next = started_xprts->rtg_listhead; 306 started_xprts->rtg_listhead = xprt_rec; 307 continue; 308 cleanup: 309 SVC_DESTROY(xprt); 310 if (error == RDMA_FAILED) 311 error = EPROTONOSUPPORT; 312 } 313 314 rw_exit(&rdma_lock); 315 316 /* 317 * Don't return any error even if a single plugin was started 318 * successfully. 319 */ 320 if (started_xprts->rtg_count == 0) 321 return (error); 322 return (0); 323 } 324 325 /* 326 * Cleanup routine for freeing up memory allocated by 327 * svc_rdma_kcreate() 328 */ 329 void 330 svc_rdma_kdestroy(SVCMASTERXPRT *xprt) 331 { 332 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2; 333 334 335 mutex_destroy(&xprt->xp_req_lock); 336 mutex_destroy(&xprt->xp_thread_lock); 337 kmem_free(rd, sizeof (*rd)); 338 kmem_free(xprt->xp_addrmask.buf, xprt->xp_addrmask.maxlen); 339 kmem_free(xprt, sizeof (*xprt)); 340 } 341 342 343 static void 344 svc_rdma_kstart(SVCMASTERXPRT *xprt) 345 { 346 struct rdma_svc_data *svcdata; 347 rdma_mod_t *rmod; 348 349 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 350 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 351 352 /* 353 * Create a listener for module at this port 354 */ 355 356 if (rmod->rdma_count != 0) 357 (*rmod->rdma_ops->rdma_svc_listen)(svcdata); 358 else 359 svcdata->err_code = RDMA_FAILED; 360 } 361 362 void 363 svc_rdma_kstop(SVCMASTERXPRT *xprt) 364 { 365 struct rdma_svc_data *svcdata; 366 rdma_mod_t *rmod; 367 368 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 369 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 370 371 /* 372 * Call the stop listener routine for each plugin. If rdma_count is 373 * already zero set active to zero. 374 */ 375 if (rmod->rdma_count != 0) 376 (*rmod->rdma_ops->rdma_svc_stop)(svcdata); 377 else 378 svcdata->active = 0; 379 if (svcdata->active) 380 DTRACE_PROBE(krpc__e__svcrdma__kstop); 381 } 382 383 /* ARGSUSED */ 384 static void 385 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt) 386 { 387 } 388 389 static bool_t 390 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg) 391 { 392 XDR *xdrs; 393 CONN *conn; 394 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; 395 struct clone_rdma_data *crdp; 396 struct clist *cl = NULL; 397 struct clist *wcl = NULL; 398 struct clist *cllong = NULL; 399 400 rdma_stat status; 401 uint32_t vers, op, pos, xid; 402 uint32_t rdma_credit; 403 uint32_t wcl_total_length = 0; 404 bool_t wwl = FALSE; 405 406 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 407 RSSTAT_INCR(rscalls); 408 conn = rdp->conn; 409 410 status = rdma_svc_postrecv(conn); 411 if (status != RDMA_SUCCESS) { 412 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv); 413 goto badrpc_call; 414 } 415 416 xdrs = &clone_xprt->xp_xdrin; 417 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE); 418 xid = *(uint32_t *)rdp->rpcmsg.addr; 419 XDR_SETPOS(xdrs, sizeof (uint32_t)); 420 421 if (! xdr_u_int(xdrs, &vers) || 422 ! xdr_u_int(xdrs, &rdma_credit) || 423 ! xdr_u_int(xdrs, &op)) { 424 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint); 425 goto xdr_err; 426 } 427 428 /* Checking if the status of the recv operation was normal */ 429 if (rdp->status != 0) { 430 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status, 431 int, rdp->status); 432 goto badrpc_call; 433 } 434 435 if (! xdr_do_clist(xdrs, &cl)) { 436 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist); 437 goto xdr_err; 438 } 439 440 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) { 441 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist); 442 if (cl) 443 clist_free(cl); 444 goto xdr_err; 445 } 446 crdp->cl_wlist = wcl; 447 448 crdp->cl_reply = NULL; 449 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply); 450 451 /* 452 * A chunk at 0 offset indicates that the RPC call message 453 * is in a chunk. Get the RPC call message chunk. 454 */ 455 if (cl != NULL && op == RDMA_NOMSG) { 456 457 /* Remove RPC call message chunk from chunklist */ 458 cllong = cl; 459 cl = cl->c_next; 460 cllong->c_next = NULL; 461 462 463 /* Allocate and register memory for the RPC call msg chunk */ 464 cllong->rb_longbuf.type = RDMA_LONG_BUFFER; 465 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ? 466 cllong->c_len : LONG_REPLY_LEN; 467 468 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) { 469 clist_free(cllong); 470 goto cll_malloc_err; 471 } 472 473 cllong->u.c_daddr3 = cllong->rb_longbuf.addr; 474 475 if (cllong->u.c_daddr == NULL) { 476 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem); 477 rdma_buf_free(conn, &cllong->rb_longbuf); 478 clist_free(cllong); 479 goto cll_malloc_err; 480 } 481 482 status = clist_register(conn, cllong, CLIST_REG_DST); 483 if (status) { 484 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg); 485 rdma_buf_free(conn, &cllong->rb_longbuf); 486 clist_free(cllong); 487 goto cll_malloc_err; 488 } 489 490 /* 491 * Now read the RPC call message in 492 */ 493 status = RDMA_READ(conn, cllong, WAIT); 494 if (status) { 495 DTRACE_PROBE(krpc__e__svcrdma__krecv__read); 496 (void) clist_deregister(conn, cllong); 497 rdma_buf_free(conn, &cllong->rb_longbuf); 498 clist_free(cllong); 499 goto cll_malloc_err; 500 } 501 502 status = clist_syncmem(conn, cllong, CLIST_REG_DST); 503 (void) clist_deregister(conn, cllong); 504 505 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3, 506 cllong->c_len, 0, cl, XDR_DECODE, conn); 507 508 crdp->rpcbuf = cllong->rb_longbuf; 509 crdp->rpcbuf.len = cllong->c_len; 510 clist_free(cllong); 511 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 512 } else { 513 pos = XDR_GETPOS(xdrs); 514 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos, 515 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn); 516 crdp->rpcbuf = rdp->rpcmsg; 517 518 /* Use xdrrdmablk_ops to indicate there is a read chunk list */ 519 if (cl != NULL) { 520 int32_t flg = XDR_RDMA_RLIST_REG; 521 522 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 523 xdrs->x_ops = &xdrrdmablk_ops; 524 } 525 } 526 527 if (crdp->cl_wlist) { 528 int32_t flg = XDR_RDMA_WLIST_REG; 529 530 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist); 531 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 532 } 533 534 if (! xdr_callmsg(xdrs, msg)) { 535 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg); 536 RSSTAT_INCR(rsxdrcall); 537 goto callmsg_err; 538 } 539 540 /* 541 * Point the remote transport address in the service_transport 542 * handle at the address in the request. 543 */ 544 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf; 545 clone_xprt->xp_rtaddr.len = conn->c_raddr.len; 546 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len; 547 548 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf; 549 clone_xprt->xp_lcladdr.len = conn->c_laddr.len; 550 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len; 551 552 /* 553 * In case of RDMA, connection management is 554 * entirely done in rpcib module and netid in the 555 * SVCMASTERXPRT is NULL. Initialize the clone netid 556 * from the connection. 557 */ 558 559 clone_xprt->xp_netid = conn->c_netid; 560 561 clone_xprt->xp_xid = xid; 562 crdp->conn = conn; 563 564 freeb(mp); 565 566 return (TRUE); 567 568 callmsg_err: 569 rdma_buf_free(conn, &crdp->rpcbuf); 570 571 cll_malloc_err: 572 if (cl) 573 clist_free(cl); 574 xdr_err: 575 XDR_DESTROY(xdrs); 576 577 badrpc_call: 578 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 579 RDMA_REL_CONN(conn); 580 freeb(mp); 581 RSSTAT_INCR(rsbadcalls); 582 return (FALSE); 583 } 584 585 static int 586 svc_process_long_reply(SVCXPRT * clone_xprt, 587 xdrproc_t xdr_results, caddr_t xdr_location, 588 struct rpc_msg *msg, bool_t has_args, int *msglen, 589 int *freelen, int *numchunks, unsigned int *final_len) 590 { 591 int status; 592 XDR xdrslong; 593 struct clist *wcl = NULL; 594 int count = 0; 595 int alloc_len; 596 char *memp; 597 rdma_buf_t long_rpc = {0}; 598 struct clone_rdma_data *crdp; 599 600 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 601 602 bzero(&xdrslong, sizeof (xdrslong)); 603 604 /* Choose a size for the long rpc response */ 605 if (MSG_IS_RPCSEC_GSS(msg)) { 606 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen); 607 } else { 608 alloc_len = RNDUP(*msglen); 609 } 610 611 if (alloc_len <= 64 * 1024) { 612 if (alloc_len > 32 * 1024) { 613 alloc_len = 64 * 1024; 614 } else { 615 if (alloc_len > 16 * 1024) { 616 alloc_len = 32 * 1024; 617 } else { 618 alloc_len = 16 * 1024; 619 } 620 } 621 } 622 623 long_rpc.type = RDMA_LONG_BUFFER; 624 long_rpc.len = alloc_len; 625 if (rdma_buf_alloc(crdp->conn, &long_rpc)) { 626 return (SVC_RDMA_FAIL); 627 } 628 629 memp = long_rpc.addr; 630 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE); 631 632 msg->rm_xid = clone_xprt->xp_xid; 633 634 if (!(xdr_replymsg(&xdrslong, msg) && 635 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong, 636 xdr_results, xdr_location)))) { 637 rdma_buf_free(crdp->conn, &long_rpc); 638 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap); 639 return (SVC_RDMA_FAIL); 640 } 641 642 *final_len = XDR_GETPOS(&xdrslong); 643 644 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len); 645 *numchunks = 0; 646 *freelen = 0; 647 648 wcl = crdp->cl_reply; 649 wcl->rb_longbuf = long_rpc; 650 651 count = *final_len; 652 while ((wcl != NULL) && (count > 0)) { 653 654 if (wcl->c_dmemhandle.mrc_rmr == 0) 655 break; 656 657 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count, 658 uint32_t, wcl->c_len); 659 660 if (wcl->c_len > count) { 661 wcl->c_len = count; 662 } 663 wcl->w.c_saddr3 = (caddr_t)memp; 664 665 count -= wcl->c_len; 666 *numchunks += 1; 667 memp += wcl->c_len; 668 wcl = wcl->c_next; 669 } 670 671 /* 672 * Make rest of the chunks 0-len 673 */ 674 while (wcl != NULL) { 675 if (wcl->c_dmemhandle.mrc_rmr == 0) 676 break; 677 wcl->c_len = 0; 678 wcl = wcl->c_next; 679 } 680 681 wcl = crdp->cl_reply; 682 683 /* 684 * MUST fail if there are still more data 685 */ 686 if (count > 0) { 687 rdma_buf_free(crdp->conn, &long_rpc); 688 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist); 689 return (SVC_RDMA_FAIL); 690 } 691 692 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) { 693 rdma_buf_free(crdp->conn, &long_rpc); 694 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg); 695 return (SVC_RDMA_FAIL); 696 } 697 698 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE); 699 700 if (status) { 701 (void) clist_deregister(crdp->conn, wcl); 702 rdma_buf_free(crdp->conn, &long_rpc); 703 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem); 704 return (SVC_RDMA_FAIL); 705 } 706 707 status = RDMA_WRITE(crdp->conn, wcl, WAIT); 708 709 (void) clist_deregister(crdp->conn, wcl); 710 rdma_buf_free(crdp->conn, &wcl->rb_longbuf); 711 712 if (status != RDMA_SUCCESS) { 713 DTRACE_PROBE(krpc__e__svcrdma__longrep__write); 714 return (SVC_RDMA_FAIL); 715 } 716 717 return (SVC_RDMA_SUCCESS); 718 } 719 720 721 static int 722 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results, 723 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs, 724 struct rpc_msg *msg, bool_t has_args, uint_t *len) 725 { 726 /* 727 * Get a pre-allocated buffer for rpc reply 728 */ 729 rpcreply->type = SEND_BUFFER; 730 if (rdma_buf_alloc(conn, rpcreply)) { 731 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs); 732 return (SVC_RDMA_FAIL); 733 } 734 735 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len, 736 0, NULL, XDR_ENCODE, conn); 737 738 msg->rm_xid = clone_xprt->xp_xid; 739 740 if (has_args) { 741 if (!(xdr_replymsg(*xdrs, msg) && 742 (!has_args || 743 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs, 744 xdr_results, xdr_location)))) { 745 rdma_buf_free(conn, rpcreply); 746 DTRACE_PROBE( 747 krpc__e__svcrdma__rpcmsg__reply__authwrap1); 748 return (SVC_RDMA_FAIL); 749 } 750 } else { 751 if (!xdr_replymsg(*xdrs, msg)) { 752 rdma_buf_free(conn, rpcreply); 753 DTRACE_PROBE( 754 krpc__e__svcrdma__rpcmsg__reply__authwrap2); 755 return (SVC_RDMA_FAIL); 756 } 757 } 758 759 *len = XDR_GETPOS(*xdrs); 760 761 return (SVC_RDMA_SUCCESS); 762 } 763 764 /* 765 * Send rpc reply. 766 */ 767 static bool_t 768 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg) 769 { 770 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout); 771 XDR xdrs_rhdr; 772 CONN *conn = NULL; 773 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0}; 774 775 struct clone_rdma_data *crdp; 776 struct clist *cl_read = NULL; 777 struct clist *cl_send = NULL; 778 struct clist *cl_write = NULL; 779 xdrproc_t xdr_results; /* results XDR encoding function */ 780 caddr_t xdr_location; /* response results pointer */ 781 782 int retval = FALSE; 783 int status, msglen, num_wreply_segments = 0; 784 uint32_t rdma_credit = 0; 785 int freelen = 0; 786 bool_t has_args; 787 uint_t final_resp_len, rdma_response_op, vers; 788 789 bzero(&xdrs_rhdr, sizeof (XDR)); 790 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 791 conn = crdp->conn; 792 793 /* 794 * If there is a result procedure specified in the reply message, 795 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP. 796 * We need to make sure it won't be processed twice, so we null 797 * it for xdr_replymsg here. 798 */ 799 has_args = FALSE; 800 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 801 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 802 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) { 803 has_args = TRUE; 804 xdr_location = msg->acpted_rply.ar_results.where; 805 msg->acpted_rply.ar_results.proc = xdr_void; 806 msg->acpted_rply.ar_results.where = NULL; 807 } 808 } 809 810 /* 811 * Given the limit on the inline response size (RPC_MSG_SZ), 812 * there is a need to make a guess as to the overall size of 813 * the response. If the resultant size is beyond the inline 814 * size, then the server needs to use the "reply chunk list" 815 * provided by the client (if the client provided one). An 816 * example of this type of response would be a READDIR 817 * response (e.g. a small directory read would fit in RPC_MSG_SZ 818 * and that is the preference but it may not fit) 819 * 820 * Combine the encoded size and the size of the true results 821 * and then make the decision about where to encode and send results. 822 * 823 * One important note, this calculation is ignoring the size 824 * of the encoding of the authentication overhead. The reason 825 * for this is rooted in the complexities of access to the 826 * encoded size of RPCSEC_GSS related authentiation, 827 * integrity, and privacy. 828 * 829 * If it turns out that the encoded authentication bumps the 830 * response over the RPC_MSG_SZ limit, then it may need to 831 * attempt to encode for the reply chunk list. 832 */ 833 834 /* 835 * Calculating the "sizeof" the RPC response header and the 836 * encoded results. 837 */ 838 msglen = xdr_sizeof(xdr_replymsg, msg); 839 840 if (msglen > 0) { 841 RSSTAT_INCR(rstotalreplies); 842 } 843 if (has_args) 844 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 845 rdma_minchunk, NULL, NULL); 846 847 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen); 848 849 status = SVC_RDMA_SUCCESS; 850 851 if (msglen < RPC_MSG_SZ) { 852 /* 853 * Looks like the response will fit in the inline 854 * response; let's try 855 */ 856 RSSTAT_INCR(rstotalinlinereplies); 857 858 rdma_response_op = RDMA_MSG; 859 860 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results, 861 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg, 862 has_args, &final_resp_len); 863 864 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status, 865 int, status); 866 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len, 867 int, final_resp_len); 868 869 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) { 870 clist_free(crdp->cl_reply); 871 crdp->cl_reply = NULL; 872 } 873 } 874 875 /* 876 * If the encode failed (size?) or the message really is 877 * larger than what is allowed, try the response chunk list. 878 */ 879 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) { 880 /* 881 * attempting to use a reply chunk list when there 882 * isn't one won't get very far... 883 */ 884 if (crdp->cl_reply == NULL) { 885 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl); 886 goto out; 887 } 888 889 RSSTAT_INCR(rstotallongreplies); 890 891 msglen = xdr_sizeof(xdr_replymsg, msg); 892 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0, 893 NULL, NULL); 894 895 status = svc_process_long_reply(clone_xprt, xdr_results, 896 xdr_location, msg, has_args, &msglen, &freelen, 897 &num_wreply_segments, &final_resp_len); 898 899 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen, 900 int, final_resp_len); 901 902 if (status != SVC_RDMA_SUCCESS) { 903 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed); 904 goto out; 905 } 906 907 rdma_response_op = RDMA_NOMSG; 908 } 909 910 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len, 911 int, final_resp_len); 912 913 rbuf_resp.type = SEND_BUFFER; 914 if (rdma_buf_alloc(conn, &rbuf_resp)) { 915 rdma_buf_free(conn, &rbuf_rpc_resp); 916 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs); 917 goto out; 918 } 919 920 rdma_credit = rdma_bufs_granted; 921 922 vers = RPCRDMA_VERS; 923 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE); 924 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid; 925 /* Skip xid and set the xdr position accordingly. */ 926 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t)); 927 if (!xdr_u_int(&xdrs_rhdr, &vers) || 928 !xdr_u_int(&xdrs_rhdr, &rdma_credit) || 929 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) { 930 rdma_buf_free(conn, &rbuf_rpc_resp); 931 rdma_buf_free(conn, &rbuf_resp); 932 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint); 933 goto out; 934 } 935 936 /* 937 * Now XDR the read chunk list, actually always NULL 938 */ 939 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read); 940 941 /* 942 * encode write list -- we already drove RDMA_WRITEs 943 */ 944 cl_write = crdp->cl_wlist; 945 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) { 946 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist); 947 rdma_buf_free(conn, &rbuf_rpc_resp); 948 rdma_buf_free(conn, &rbuf_resp); 949 goto out; 950 } 951 952 /* 953 * XDR encode the RDMA_REPLY write chunk 954 */ 955 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply, 956 num_wreply_segments)) { 957 rdma_buf_free(conn, &rbuf_rpc_resp); 958 rdma_buf_free(conn, &rbuf_resp); 959 goto out; 960 } 961 962 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle, 963 rbuf_resp.addr, NULL, NULL); 964 965 if (rdma_response_op == RDMA_MSG) { 966 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle, 967 rbuf_rpc_resp.addr, NULL, NULL); 968 } 969 970 status = RDMA_SEND(conn, cl_send, msg->rm_xid); 971 972 if (status == RDMA_SUCCESS) { 973 retval = TRUE; 974 } 975 976 out: 977 /* 978 * Free up sendlist chunks 979 */ 980 if (cl_send != NULL) 981 clist_free(cl_send); 982 983 /* 984 * Destroy private data for xdr rdma 985 */ 986 if (clone_xprt->xp_xdrout.x_ops != NULL) { 987 XDR_DESTROY(&(clone_xprt->xp_xdrout)); 988 } 989 990 if (crdp->cl_reply) { 991 clist_free(crdp->cl_reply); 992 crdp->cl_reply = NULL; 993 } 994 995 /* 996 * This is completely disgusting. If public is set it is 997 * a pointer to a structure whose first field is the address 998 * of the function to free that structure and any related 999 * stuff. (see rrokfree in nfs_xdr.c). 1000 */ 1001 if (xdrs_rpc->x_public) { 1002 /* LINTED pointer alignment */ 1003 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public); 1004 } 1005 1006 if (xdrs_rhdr.x_ops != NULL) { 1007 XDR_DESTROY(&xdrs_rhdr); 1008 } 1009 1010 return (retval); 1011 } 1012 1013 /* 1014 * Deserialize arguments. 1015 */ 1016 static bool_t 1017 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr) 1018 { 1019 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin, 1020 xdr_args, args_ptr)) != TRUE) 1021 return (FALSE); 1022 return (TRUE); 1023 } 1024 1025 static bool_t 1026 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, 1027 caddr_t args_ptr) 1028 { 1029 struct clone_rdma_data *crdp; 1030 bool_t retval; 1031 1032 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 1033 1034 /* 1035 * Free the args if needed then XDR_DESTROY 1036 */ 1037 if (args_ptr) { 1038 XDR *xdrs = &clone_xprt->xp_xdrin; 1039 1040 xdrs->x_op = XDR_FREE; 1041 retval = (*xdr_args)(xdrs, args_ptr); 1042 } 1043 1044 XDR_DESTROY(&(clone_xprt->xp_xdrin)); 1045 rdma_buf_free(crdp->conn, &crdp->rpcbuf); 1046 if (crdp->cl_reply) { 1047 clist_free(crdp->cl_reply); 1048 crdp->cl_reply = NULL; 1049 } 1050 RDMA_REL_CONN(crdp->conn); 1051 1052 return (retval); 1053 } 1054 1055 /* ARGSUSED */ 1056 static int32_t * 1057 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size) 1058 { 1059 return (NULL); 1060 } 1061 1062 /* ARGSUSED */ 1063 static void 1064 svc_rdma_kfreeres(SVCXPRT *clone_xprt) 1065 { 1066 } 1067 1068 /* 1069 * the dup cacheing routines below provide a cache of non-failure 1070 * transaction id's. rpc service routines can use this to detect 1071 * retransmissions and re-send a non-failure response. 1072 */ 1073 1074 /* 1075 * MAXDUPREQS is the number of cached items. It should be adjusted 1076 * to the service load so that there is likely to be a response entry 1077 * when the first retransmission comes in. 1078 */ 1079 #define MAXDUPREQS 1024 1080 1081 /* 1082 * This should be appropriately scaled to MAXDUPREQS. 1083 */ 1084 #define DRHASHSZ 257 1085 1086 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0) 1087 #define XIDHASH(xid) ((xid) & (DRHASHSZ - 1)) 1088 #else 1089 #define XIDHASH(xid) ((xid) % DRHASHSZ) 1090 #endif 1091 #define DRHASH(dr) XIDHASH((dr)->dr_xid) 1092 #define REQTOXID(req) ((req)->rq_xprt->xp_xid) 1093 1094 static int rdmandupreqs = 0; 1095 int rdmamaxdupreqs = MAXDUPREQS; 1096 static kmutex_t rdmadupreq_lock; 1097 static struct dupreq *rdmadrhashtbl[DRHASHSZ]; 1098 static int rdmadrhashstat[DRHASHSZ]; 1099 1100 static void unhash(struct dupreq *); 1101 1102 /* 1103 * rdmadrmru points to the head of a circular linked list in lru order. 1104 * rdmadrmru->dr_next == drlru 1105 */ 1106 struct dupreq *rdmadrmru; 1107 1108 /* 1109 * svc_rdma_kdup searches the request cache and returns 0 if the 1110 * request is not found in the cache. If it is found, then it 1111 * returns the state of the request (in progress or done) and 1112 * the status or attributes that were part of the original reply. 1113 */ 1114 static int 1115 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp, 1116 bool_t *dupcachedp) 1117 { 1118 struct dupreq *dr; 1119 uint32_t xid; 1120 uint32_t drhash; 1121 int status; 1122 1123 xid = REQTOXID(req); 1124 mutex_enter(&rdmadupreq_lock); 1125 RSSTAT_INCR(rsdupchecks); 1126 /* 1127 * Check to see whether an entry already exists in the cache. 1128 */ 1129 dr = rdmadrhashtbl[XIDHASH(xid)]; 1130 while (dr != NULL) { 1131 if (dr->dr_xid == xid && 1132 dr->dr_proc == req->rq_proc && 1133 dr->dr_prog == req->rq_prog && 1134 dr->dr_vers == req->rq_vers && 1135 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len && 1136 bcmp((caddr_t)dr->dr_addr.buf, 1137 (caddr_t)req->rq_xprt->xp_rtaddr.buf, 1138 dr->dr_addr.len) == 0) { 1139 status = dr->dr_status; 1140 if (status == DUP_DONE) { 1141 bcopy(dr->dr_resp.buf, res, size); 1142 if (dupcachedp != NULL) 1143 *dupcachedp = (dr->dr_resfree != NULL); 1144 } else { 1145 dr->dr_status = DUP_INPROGRESS; 1146 *drpp = dr; 1147 } 1148 RSSTAT_INCR(rsdupreqs); 1149 mutex_exit(&rdmadupreq_lock); 1150 return (status); 1151 } 1152 dr = dr->dr_chain; 1153 } 1154 1155 /* 1156 * There wasn't an entry, either allocate a new one or recycle 1157 * an old one. 1158 */ 1159 if (rdmandupreqs < rdmamaxdupreqs) { 1160 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP); 1161 if (dr == NULL) { 1162 mutex_exit(&rdmadupreq_lock); 1163 return (DUP_ERROR); 1164 } 1165 dr->dr_resp.buf = NULL; 1166 dr->dr_resp.maxlen = 0; 1167 dr->dr_addr.buf = NULL; 1168 dr->dr_addr.maxlen = 0; 1169 if (rdmadrmru) { 1170 dr->dr_next = rdmadrmru->dr_next; 1171 rdmadrmru->dr_next = dr; 1172 } else { 1173 dr->dr_next = dr; 1174 } 1175 rdmandupreqs++; 1176 } else { 1177 dr = rdmadrmru->dr_next; 1178 while (dr->dr_status == DUP_INPROGRESS) { 1179 dr = dr->dr_next; 1180 if (dr == rdmadrmru->dr_next) { 1181 mutex_exit(&rdmadupreq_lock); 1182 return (DUP_ERROR); 1183 } 1184 } 1185 unhash(dr); 1186 if (dr->dr_resfree) { 1187 (*dr->dr_resfree)(dr->dr_resp.buf); 1188 } 1189 } 1190 dr->dr_resfree = NULL; 1191 rdmadrmru = dr; 1192 1193 dr->dr_xid = REQTOXID(req); 1194 dr->dr_prog = req->rq_prog; 1195 dr->dr_vers = req->rq_vers; 1196 dr->dr_proc = req->rq_proc; 1197 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { 1198 if (dr->dr_addr.buf != NULL) 1199 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); 1200 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; 1201 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); 1202 if (dr->dr_addr.buf == NULL) { 1203 dr->dr_addr.maxlen = 0; 1204 dr->dr_status = DUP_DROP; 1205 mutex_exit(&rdmadupreq_lock); 1206 return (DUP_ERROR); 1207 } 1208 } 1209 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; 1210 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); 1211 if (dr->dr_resp.maxlen < size) { 1212 if (dr->dr_resp.buf != NULL) 1213 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); 1214 dr->dr_resp.maxlen = (unsigned int)size; 1215 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP); 1216 if (dr->dr_resp.buf == NULL) { 1217 dr->dr_resp.maxlen = 0; 1218 dr->dr_status = DUP_DROP; 1219 mutex_exit(&rdmadupreq_lock); 1220 return (DUP_ERROR); 1221 } 1222 } 1223 dr->dr_status = DUP_INPROGRESS; 1224 1225 drhash = (uint32_t)DRHASH(dr); 1226 dr->dr_chain = rdmadrhashtbl[drhash]; 1227 rdmadrhashtbl[drhash] = dr; 1228 rdmadrhashstat[drhash]++; 1229 mutex_exit(&rdmadupreq_lock); 1230 *drpp = dr; 1231 return (DUP_NEW); 1232 } 1233 1234 /* 1235 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP) 1236 * and stores the response. 1237 */ 1238 static void 1239 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), 1240 int size, int status) 1241 { 1242 ASSERT(dr->dr_resfree == NULL); 1243 if (status == DUP_DONE) { 1244 bcopy(res, dr->dr_resp.buf, size); 1245 dr->dr_resfree = dis_resfree; 1246 } 1247 dr->dr_status = status; 1248 } 1249 1250 /* 1251 * This routine expects that the mutex, rdmadupreq_lock, is already held. 1252 */ 1253 static void 1254 unhash(struct dupreq *dr) 1255 { 1256 struct dupreq *drt; 1257 struct dupreq *drtprev = NULL; 1258 uint32_t drhash; 1259 1260 ASSERT(MUTEX_HELD(&rdmadupreq_lock)); 1261 1262 drhash = (uint32_t)DRHASH(dr); 1263 drt = rdmadrhashtbl[drhash]; 1264 while (drt != NULL) { 1265 if (drt == dr) { 1266 rdmadrhashstat[drhash]--; 1267 if (drtprev == NULL) { 1268 rdmadrhashtbl[drhash] = drt->dr_chain; 1269 } else { 1270 drtprev->dr_chain = drt->dr_chain; 1271 } 1272 return; 1273 } 1274 drtprev = drt; 1275 drt = drt->dr_chain; 1276 } 1277 } 1278 1279 bool_t 1280 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist) 1281 { 1282 struct clist *clist; 1283 uint32_t tlen; 1284 1285 if (req->rq_xprt->xp_type != T_RDMA) { 1286 return (FALSE); 1287 } 1288 1289 tlen = 0; 1290 clist = wlist; 1291 while (clist) { 1292 tlen += clist->c_len; 1293 clist = clist->c_next; 1294 } 1295 1296 /* 1297 * set iov to addr+len of first segment of first wchunk of 1298 * wlist sent by client. krecv() already malloc'd a buffer 1299 * large enough, but registration is deferred until we write 1300 * the buffer back to (NFS) client using RDMA_WRITE. 1301 */ 1302 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr; 1303 iov->iov_len = tlen; 1304 1305 return (TRUE); 1306 } 1307 1308 /* 1309 * routine to setup the read chunk lists 1310 */ 1311 1312 int 1313 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len) 1314 { 1315 int data_len, avail_len; 1316 uint_t round_len; 1317 1318 data_len = avail_len = 0; 1319 1320 while (wcl != NULL && count > 0) { 1321 if (wcl->c_dmemhandle.mrc_rmr == 0) 1322 break; 1323 1324 if (wcl->c_len < count) { 1325 data_len += wcl->c_len; 1326 avail_len = 0; 1327 } else { 1328 data_len += count; 1329 avail_len = wcl->c_len - count; 1330 wcl->c_len = count; 1331 } 1332 count -= wcl->c_len; 1333 1334 if (count == 0) 1335 break; 1336 1337 wcl = wcl->c_next; 1338 } 1339 1340 /* 1341 * MUST fail if there are still more data 1342 */ 1343 if (count > 0) { 1344 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len, 1345 int, data_len, int, count); 1346 return (FALSE); 1347 } 1348 1349 /* 1350 * Round up the last chunk to 4-byte boundary 1351 */ 1352 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT); 1353 round_len = *wcl_len - data_len; 1354 1355 if (round_len) { 1356 1357 /* 1358 * If there is space in the current chunk, 1359 * add the roundup to the chunk. 1360 */ 1361 if (avail_len >= round_len) { 1362 wcl->c_len += round_len; 1363 } else { 1364 /* 1365 * try the next one. 1366 */ 1367 wcl = wcl->c_next; 1368 if ((wcl == NULL) || (wcl->c_len < round_len)) { 1369 DTRACE_PROBE1( 1370 krpc__e__rdma_setup_read_chunks_rndup, 1371 int, round_len); 1372 return (FALSE); 1373 } 1374 wcl->c_len = round_len; 1375 } 1376 } 1377 1378 wcl = wcl->c_next; 1379 1380 /* 1381 * Make rest of the chunks 0-len 1382 */ 1383 1384 clist_zero_len(wcl); 1385 1386 return (TRUE); 1387 } 1388