1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 24 */ 25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 26 /* All Rights Reserved */ 27 /* 28 * Portions of this source code were derived from Berkeley 29 * 4.3 BSD under license from the Regents of the University of 30 * California. 31 */ 32 33 /* 34 * Server side of RPC over RDMA in the kernel. 35 */ 36 37 #include <sys/param.h> 38 #include <sys/types.h> 39 #include <sys/user.h> 40 #include <sys/sysmacros.h> 41 #include <sys/proc.h> 42 #include <sys/file.h> 43 #include <sys/errno.h> 44 #include <sys/kmem.h> 45 #include <sys/debug.h> 46 #include <sys/systm.h> 47 #include <sys/cmn_err.h> 48 #include <sys/kstat.h> 49 #include <sys/vtrace.h> 50 #include <sys/debug.h> 51 52 #include <rpc/types.h> 53 #include <rpc/xdr.h> 54 #include <rpc/auth.h> 55 #include <rpc/clnt.h> 56 #include <rpc/rpc_msg.h> 57 #include <rpc/svc.h> 58 #include <rpc/rpc_rdma.h> 59 #include <sys/ddi.h> 60 #include <sys/sunddi.h> 61 62 #include <inet/common.h> 63 #include <inet/ip.h> 64 #include <inet/ip6.h> 65 66 #include <nfs/nfs.h> 67 #include <sys/sdt.h> 68 69 #define SVC_RDMA_SUCCESS 0 70 #define SVC_RDMA_FAIL -1 71 72 #define SVC_CREDIT_FACTOR (0.5) 73 74 #define MSG_IS_RPCSEC_GSS(msg) \ 75 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) 76 77 78 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT; 79 80 /* 81 * RDMA transport specific data associated with SVCMASTERXPRT 82 */ 83 struct rdma_data { 84 SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */ 85 struct rdma_svc_data rd_data; /* rdma data */ 86 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */ 87 }; 88 89 /* 90 * Plugin connection specific data stashed away in clone SVCXPRT 91 */ 92 struct clone_rdma_data { 93 bool_t cloned; /* xprt cloned for thread processing */ 94 CONN *conn; /* RDMA connection */ 95 rdma_buf_t rpcbuf; /* RPC req/resp buffer */ 96 struct clist *cl_reply; /* reply chunk buffer info */ 97 struct clist *cl_wlist; /* write list clist */ 98 }; 99 100 101 #define MAXADDRLEN 128 /* max length for address mask */ 102 103 /* 104 * Routines exported through ops vector. 105 */ 106 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *); 107 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *); 108 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t); 109 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t); 110 void svc_rdma_kdestroy(SVCMASTERXPRT *); 111 static int svc_rdma_kdup(struct svc_req *, caddr_t, int, 112 struct dupreq **, bool_t *); 113 static void svc_rdma_kdupdone(struct dupreq *, caddr_t, 114 void (*)(), int, int); 115 static int32_t *svc_rdma_kgetres(SVCXPRT *, int); 116 static void svc_rdma_kfreeres(SVCXPRT *); 117 static void svc_rdma_kclone_destroy(SVCXPRT *); 118 static void svc_rdma_kstart(SVCMASTERXPRT *); 119 void svc_rdma_kstop(SVCMASTERXPRT *); 120 static void svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *); 121 static void svc_rdma_ktattrs(SVCXPRT *, int, void **); 122 123 static int svc_process_long_reply(SVCXPRT *, xdrproc_t, 124 caddr_t, struct rpc_msg *, bool_t, int *, 125 int *, int *, unsigned int *); 126 127 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t, 128 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *, 129 bool_t, uint_t *); 130 static bool_t rpcmsg_length(xdrproc_t, 131 caddr_t, 132 struct rpc_msg *, bool_t, int); 133 134 /* 135 * Server transport operations vector. 136 */ 137 struct svc_ops rdma_svc_ops = { 138 svc_rdma_krecv, /* Get requests */ 139 svc_rdma_kgetargs, /* Deserialize arguments */ 140 svc_rdma_ksend, /* Send reply */ 141 svc_rdma_kfreeargs, /* Free argument data space */ 142 svc_rdma_kdestroy, /* Destroy transport handle */ 143 svc_rdma_kdup, /* Check entry in dup req cache */ 144 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */ 145 svc_rdma_kgetres, /* Get pointer to response buffer */ 146 svc_rdma_kfreeres, /* Destroy pre-serialized response header */ 147 svc_rdma_kclone_destroy, /* Destroy a clone xprt */ 148 svc_rdma_kstart, /* Tell `ready-to-receive' to rpcmod */ 149 svc_rdma_kclone_xprt, /* Transport specific clone xprt */ 150 svc_rdma_ktattrs /* Get Transport Attributes */ 151 }; 152 153 /* 154 * Server statistics 155 * NOTE: This structure type is duplicated in the NFS fast path. 156 */ 157 struct { 158 kstat_named_t rscalls; 159 kstat_named_t rsbadcalls; 160 kstat_named_t rsnullrecv; 161 kstat_named_t rsbadlen; 162 kstat_named_t rsxdrcall; 163 kstat_named_t rsdupchecks; 164 kstat_named_t rsdupreqs; 165 kstat_named_t rslongrpcs; 166 kstat_named_t rstotalreplies; 167 kstat_named_t rstotallongreplies; 168 kstat_named_t rstotalinlinereplies; 169 } rdmarsstat = { 170 { "calls", KSTAT_DATA_UINT64 }, 171 { "badcalls", KSTAT_DATA_UINT64 }, 172 { "nullrecv", KSTAT_DATA_UINT64 }, 173 { "badlen", KSTAT_DATA_UINT64 }, 174 { "xdrcall", KSTAT_DATA_UINT64 }, 175 { "dupchecks", KSTAT_DATA_UINT64 }, 176 { "dupreqs", KSTAT_DATA_UINT64 }, 177 { "longrpcs", KSTAT_DATA_UINT64 }, 178 { "totalreplies", KSTAT_DATA_UINT64 }, 179 { "totallongreplies", KSTAT_DATA_UINT64 }, 180 { "totalinlinereplies", KSTAT_DATA_UINT64 }, 181 }; 182 183 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat; 184 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t); 185 186 #define RSSTAT_INCR(x) atomic_inc_64(&rdmarsstat.x.value.ui64) 187 /* 188 * Create a transport record. 189 * The transport record, output buffer, and private data structure 190 * are allocated. The output buffer is serialized into using xdrmem. 191 * There is one transport record per user process which implements a 192 * set of services. 193 */ 194 /* ARGSUSED */ 195 int 196 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, 197 rdma_xprt_group_t *started_xprts) 198 { 199 int error; 200 SVCMASTERXPRT *xprt; 201 struct rdma_data *rd; 202 rdma_registry_t *rmod; 203 rdma_xprt_record_t *xprt_rec; 204 queue_t *q; 205 /* 206 * modload the RDMA plugins is not already done. 207 */ 208 if (!rdma_modloaded) { 209 /*CONSTANTCONDITION*/ 210 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN); 211 212 mutex_enter(&rdma_modload_lock); 213 if (!rdma_modloaded) { 214 error = rdma_modload(); 215 } 216 mutex_exit(&rdma_modload_lock); 217 218 if (error) 219 return (error); 220 } 221 222 /* 223 * master_xprt_count is the count of master transport handles 224 * that were successfully created and are ready to recieve for 225 * RDMA based access. 226 */ 227 error = 0; 228 xprt_rec = NULL; 229 rw_enter(&rdma_lock, RW_READER); 230 if (rdma_mod_head == NULL) { 231 started_xprts->rtg_count = 0; 232 rw_exit(&rdma_lock); 233 if (rdma_dev_available) 234 return (EPROTONOSUPPORT); 235 else 236 return (ENODEV); 237 } 238 239 /* 240 * If we have reached here, then atleast one RDMA plugin has loaded. 241 * Create a master_xprt, make it start listenining on the device, 242 * if an error is generated, record it, we might need to shut 243 * the master_xprt. 244 * SVC_START() calls svc_rdma_kstart which calls plugin binding 245 * routines. 246 */ 247 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) { 248 249 /* 250 * One SVCMASTERXPRT per RDMA plugin. 251 */ 252 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP); 253 xprt->xp_ops = &rdma_svc_ops; 254 xprt->xp_sct = sct; 255 xprt->xp_type = T_RDMA; 256 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); 257 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); 258 xprt->xp_req_head = (mblk_t *)0; 259 xprt->xp_req_tail = (mblk_t *)0; 260 xprt->xp_full = FALSE; 261 xprt->xp_enable = FALSE; 262 xprt->xp_reqs = 0; 263 xprt->xp_size = 0; 264 xprt->xp_threads = 0; 265 xprt->xp_detached_threads = 0; 266 267 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP); 268 xprt->xp_p2 = (caddr_t)rd; 269 rd->rd_xprt = xprt; 270 rd->r_mod = rmod->r_mod; 271 272 q = &rd->rd_data.q; 273 xprt->xp_wq = q; 274 q->q_ptr = &rd->rd_xprt; 275 xprt->xp_netid = NULL; 276 277 /* 278 * Each of the plugins will have their own Service ID 279 * to listener specific mapping, like port number for VI 280 * and service name for IB. 281 */ 282 rd->rd_data.svcid = id; 283 error = svc_xprt_register(xprt, id); 284 if (error) { 285 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg); 286 goto cleanup; 287 } 288 289 SVC_START(xprt); 290 if (!rd->rd_data.active) { 291 svc_xprt_unregister(xprt); 292 error = rd->rd_data.err_code; 293 goto cleanup; 294 } 295 296 /* 297 * This is set only when there is atleast one or more 298 * transports successfully created. We insert the pointer 299 * to the created RDMA master xprt into a separately maintained 300 * list. This way we can easily reference it later to cleanup, 301 * when NFS kRPC service pool is going away/unregistered. 302 */ 303 started_xprts->rtg_count ++; 304 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP); 305 xprt_rec->rtr_xprt_ptr = xprt; 306 xprt_rec->rtr_next = started_xprts->rtg_listhead; 307 started_xprts->rtg_listhead = xprt_rec; 308 continue; 309 cleanup: 310 SVC_DESTROY(xprt); 311 if (error == RDMA_FAILED) 312 error = EPROTONOSUPPORT; 313 } 314 315 rw_exit(&rdma_lock); 316 317 /* 318 * Don't return any error even if a single plugin was started 319 * successfully. 320 */ 321 if (started_xprts->rtg_count == 0) 322 return (error); 323 return (0); 324 } 325 326 /* 327 * Cleanup routine for freeing up memory allocated by 328 * svc_rdma_kcreate() 329 */ 330 void 331 svc_rdma_kdestroy(SVCMASTERXPRT *xprt) 332 { 333 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2; 334 335 336 mutex_destroy(&xprt->xp_req_lock); 337 mutex_destroy(&xprt->xp_thread_lock); 338 kmem_free(rd, sizeof (*rd)); 339 kmem_free(xprt, sizeof (*xprt)); 340 } 341 342 343 static void 344 svc_rdma_kstart(SVCMASTERXPRT *xprt) 345 { 346 struct rdma_svc_data *svcdata; 347 rdma_mod_t *rmod; 348 349 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 350 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 351 352 /* 353 * Create a listener for module at this port 354 */ 355 356 if (rmod->rdma_count != 0) 357 (*rmod->rdma_ops->rdma_svc_listen)(svcdata); 358 else 359 svcdata->err_code = RDMA_FAILED; 360 } 361 362 void 363 svc_rdma_kstop(SVCMASTERXPRT *xprt) 364 { 365 struct rdma_svc_data *svcdata; 366 rdma_mod_t *rmod; 367 368 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 369 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 370 371 /* 372 * Call the stop listener routine for each plugin. If rdma_count is 373 * already zero set active to zero. 374 */ 375 if (rmod->rdma_count != 0) 376 (*rmod->rdma_ops->rdma_svc_stop)(svcdata); 377 else 378 svcdata->active = 0; 379 if (svcdata->active) 380 DTRACE_PROBE(krpc__e__svcrdma__kstop); 381 } 382 383 /* ARGSUSED */ 384 static void 385 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt) 386 { 387 388 struct clone_rdma_data *cdrp; 389 cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 390 391 /* 392 * Only free buffers and release connection when cloned is set. 393 */ 394 if (cdrp->cloned != TRUE) 395 return; 396 397 rdma_buf_free(cdrp->conn, &cdrp->rpcbuf); 398 if (cdrp->cl_reply) { 399 clist_free(cdrp->cl_reply); 400 cdrp->cl_reply = NULL; 401 } 402 RDMA_REL_CONN(cdrp->conn); 403 404 cdrp->cloned = 0; 405 } 406 407 /* 408 * Clone the xprt specific information. It will be freed by 409 * SVC_CLONE_DESTROY. 410 */ 411 static void 412 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt) 413 { 414 struct clone_rdma_data *srcp2; 415 struct clone_rdma_data *dstp2; 416 417 srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf; 418 dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf; 419 420 if (srcp2->conn != NULL) { 421 srcp2->cloned = TRUE; 422 *dstp2 = *srcp2; 423 } 424 } 425 426 static void 427 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr) 428 { 429 CONN *conn; 430 *tattr = NULL; 431 432 switch (attrflag) { 433 case SVC_TATTR_ADDRMASK: 434 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn; 435 ASSERT(conn != NULL); 436 if (conn) 437 *tattr = (void *)&conn->c_addrmask; 438 } 439 } 440 441 static bool_t 442 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg) 443 { 444 XDR *xdrs; 445 CONN *conn; 446 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; 447 struct clone_rdma_data *crdp; 448 struct clist *cl = NULL; 449 struct clist *wcl = NULL; 450 struct clist *cllong = NULL; 451 452 rdma_stat status; 453 uint32_t vers, op, pos, xid; 454 uint32_t rdma_credit; 455 uint32_t wcl_total_length = 0; 456 bool_t wwl = FALSE; 457 458 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 459 RSSTAT_INCR(rscalls); 460 conn = rdp->conn; 461 462 status = rdma_svc_postrecv(conn); 463 if (status != RDMA_SUCCESS) { 464 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv); 465 goto badrpc_call; 466 } 467 468 xdrs = &clone_xprt->xp_xdrin; 469 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE); 470 xid = *(uint32_t *)rdp->rpcmsg.addr; 471 XDR_SETPOS(xdrs, sizeof (uint32_t)); 472 473 if (! xdr_u_int(xdrs, &vers) || 474 ! xdr_u_int(xdrs, &rdma_credit) || 475 ! xdr_u_int(xdrs, &op)) { 476 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint); 477 goto xdr_err; 478 } 479 480 /* Checking if the status of the recv operation was normal */ 481 if (rdp->status != 0) { 482 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status, 483 int, rdp->status); 484 goto badrpc_call; 485 } 486 487 if (! xdr_do_clist(xdrs, &cl)) { 488 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist); 489 goto xdr_err; 490 } 491 492 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) { 493 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist); 494 if (cl) 495 clist_free(cl); 496 goto xdr_err; 497 } 498 crdp->cl_wlist = wcl; 499 500 crdp->cl_reply = NULL; 501 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply); 502 503 /* 504 * A chunk at 0 offset indicates that the RPC call message 505 * is in a chunk. Get the RPC call message chunk. 506 */ 507 if (cl != NULL && op == RDMA_NOMSG) { 508 509 /* Remove RPC call message chunk from chunklist */ 510 cllong = cl; 511 cl = cl->c_next; 512 cllong->c_next = NULL; 513 514 515 /* Allocate and register memory for the RPC call msg chunk */ 516 cllong->rb_longbuf.type = RDMA_LONG_BUFFER; 517 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ? 518 cllong->c_len : LONG_REPLY_LEN; 519 520 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) { 521 clist_free(cllong); 522 goto cll_malloc_err; 523 } 524 525 cllong->u.c_daddr3 = cllong->rb_longbuf.addr; 526 527 if (cllong->u.c_daddr == NULL) { 528 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem); 529 rdma_buf_free(conn, &cllong->rb_longbuf); 530 clist_free(cllong); 531 goto cll_malloc_err; 532 } 533 534 status = clist_register(conn, cllong, CLIST_REG_DST); 535 if (status) { 536 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg); 537 rdma_buf_free(conn, &cllong->rb_longbuf); 538 clist_free(cllong); 539 goto cll_malloc_err; 540 } 541 542 /* 543 * Now read the RPC call message in 544 */ 545 status = RDMA_READ(conn, cllong, WAIT); 546 if (status) { 547 DTRACE_PROBE(krpc__e__svcrdma__krecv__read); 548 (void) clist_deregister(conn, cllong); 549 rdma_buf_free(conn, &cllong->rb_longbuf); 550 clist_free(cllong); 551 goto cll_malloc_err; 552 } 553 554 status = clist_syncmem(conn, cllong, CLIST_REG_DST); 555 (void) clist_deregister(conn, cllong); 556 557 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3, 558 cllong->c_len, 0, cl, XDR_DECODE, conn); 559 560 crdp->rpcbuf = cllong->rb_longbuf; 561 crdp->rpcbuf.len = cllong->c_len; 562 clist_free(cllong); 563 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 564 } else { 565 pos = XDR_GETPOS(xdrs); 566 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos, 567 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn); 568 crdp->rpcbuf = rdp->rpcmsg; 569 570 /* Use xdrrdmablk_ops to indicate there is a read chunk list */ 571 if (cl != NULL) { 572 int32_t flg = XDR_RDMA_RLIST_REG; 573 574 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 575 xdrs->x_ops = &xdrrdmablk_ops; 576 } 577 } 578 579 if (crdp->cl_wlist) { 580 int32_t flg = XDR_RDMA_WLIST_REG; 581 582 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist); 583 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 584 } 585 586 if (! xdr_callmsg(xdrs, msg)) { 587 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg); 588 RSSTAT_INCR(rsxdrcall); 589 goto callmsg_err; 590 } 591 592 /* 593 * Point the remote transport address in the service_transport 594 * handle at the address in the request. 595 */ 596 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf; 597 clone_xprt->xp_rtaddr.len = conn->c_raddr.len; 598 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len; 599 600 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf; 601 clone_xprt->xp_lcladdr.len = conn->c_laddr.len; 602 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len; 603 604 /* 605 * In case of RDMA, connection management is 606 * entirely done in rpcib module and netid in the 607 * SVCMASTERXPRT is NULL. Initialize the clone netid 608 * from the connection. 609 */ 610 611 clone_xprt->xp_netid = conn->c_netid; 612 613 clone_xprt->xp_xid = xid; 614 crdp->conn = conn; 615 616 freeb(mp); 617 618 return (TRUE); 619 620 callmsg_err: 621 rdma_buf_free(conn, &crdp->rpcbuf); 622 623 cll_malloc_err: 624 if (cl) 625 clist_free(cl); 626 xdr_err: 627 XDR_DESTROY(xdrs); 628 629 badrpc_call: 630 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 631 RDMA_REL_CONN(conn); 632 freeb(mp); 633 RSSTAT_INCR(rsbadcalls); 634 return (FALSE); 635 } 636 637 static int 638 svc_process_long_reply(SVCXPRT * clone_xprt, 639 xdrproc_t xdr_results, caddr_t xdr_location, 640 struct rpc_msg *msg, bool_t has_args, int *msglen, 641 int *freelen, int *numchunks, unsigned int *final_len) 642 { 643 int status; 644 XDR xdrslong; 645 struct clist *wcl = NULL; 646 int count = 0; 647 int alloc_len; 648 char *memp; 649 rdma_buf_t long_rpc = {0}; 650 struct clone_rdma_data *crdp; 651 652 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 653 654 bzero(&xdrslong, sizeof (xdrslong)); 655 656 /* Choose a size for the long rpc response */ 657 if (MSG_IS_RPCSEC_GSS(msg)) { 658 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen); 659 } else { 660 alloc_len = RNDUP(*msglen); 661 } 662 663 if (alloc_len <= 64 * 1024) { 664 if (alloc_len > 32 * 1024) { 665 alloc_len = 64 * 1024; 666 } else { 667 if (alloc_len > 16 * 1024) { 668 alloc_len = 32 * 1024; 669 } else { 670 alloc_len = 16 * 1024; 671 } 672 } 673 } 674 675 long_rpc.type = RDMA_LONG_BUFFER; 676 long_rpc.len = alloc_len; 677 if (rdma_buf_alloc(crdp->conn, &long_rpc)) { 678 return (SVC_RDMA_FAIL); 679 } 680 681 memp = long_rpc.addr; 682 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE); 683 684 msg->rm_xid = clone_xprt->xp_xid; 685 686 if (!(xdr_replymsg(&xdrslong, msg) && 687 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong, 688 xdr_results, xdr_location)))) { 689 rdma_buf_free(crdp->conn, &long_rpc); 690 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap); 691 return (SVC_RDMA_FAIL); 692 } 693 694 *final_len = XDR_GETPOS(&xdrslong); 695 696 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len); 697 *numchunks = 0; 698 *freelen = 0; 699 700 wcl = crdp->cl_reply; 701 wcl->rb_longbuf = long_rpc; 702 703 count = *final_len; 704 while ((wcl != NULL) && (count > 0)) { 705 706 if (wcl->c_dmemhandle.mrc_rmr == 0) 707 break; 708 709 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count, 710 uint32_t, wcl->c_len); 711 712 if (wcl->c_len > count) { 713 wcl->c_len = count; 714 } 715 wcl->w.c_saddr3 = (caddr_t)memp; 716 717 count -= wcl->c_len; 718 *numchunks += 1; 719 memp += wcl->c_len; 720 wcl = wcl->c_next; 721 } 722 723 /* 724 * Make rest of the chunks 0-len 725 */ 726 while (wcl != NULL) { 727 if (wcl->c_dmemhandle.mrc_rmr == 0) 728 break; 729 wcl->c_len = 0; 730 wcl = wcl->c_next; 731 } 732 733 wcl = crdp->cl_reply; 734 735 /* 736 * MUST fail if there are still more data 737 */ 738 if (count > 0) { 739 rdma_buf_free(crdp->conn, &long_rpc); 740 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist); 741 return (SVC_RDMA_FAIL); 742 } 743 744 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) { 745 rdma_buf_free(crdp->conn, &long_rpc); 746 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg); 747 return (SVC_RDMA_FAIL); 748 } 749 750 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE); 751 752 if (status) { 753 (void) clist_deregister(crdp->conn, wcl); 754 rdma_buf_free(crdp->conn, &long_rpc); 755 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem); 756 return (SVC_RDMA_FAIL); 757 } 758 759 status = RDMA_WRITE(crdp->conn, wcl, WAIT); 760 761 (void) clist_deregister(crdp->conn, wcl); 762 rdma_buf_free(crdp->conn, &wcl->rb_longbuf); 763 764 if (status != RDMA_SUCCESS) { 765 DTRACE_PROBE(krpc__e__svcrdma__longrep__write); 766 return (SVC_RDMA_FAIL); 767 } 768 769 return (SVC_RDMA_SUCCESS); 770 } 771 772 773 static int 774 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results, 775 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs, 776 struct rpc_msg *msg, bool_t has_args, uint_t *len) 777 { 778 /* 779 * Get a pre-allocated buffer for rpc reply 780 */ 781 rpcreply->type = SEND_BUFFER; 782 if (rdma_buf_alloc(conn, rpcreply)) { 783 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs); 784 return (SVC_RDMA_FAIL); 785 } 786 787 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len, 788 0, NULL, XDR_ENCODE, conn); 789 790 msg->rm_xid = clone_xprt->xp_xid; 791 792 if (has_args) { 793 if (!(xdr_replymsg(*xdrs, msg) && 794 (!has_args || 795 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs, 796 xdr_results, xdr_location)))) { 797 rdma_buf_free(conn, rpcreply); 798 DTRACE_PROBE( 799 krpc__e__svcrdma__rpcmsg__reply__authwrap1); 800 return (SVC_RDMA_FAIL); 801 } 802 } else { 803 if (!xdr_replymsg(*xdrs, msg)) { 804 rdma_buf_free(conn, rpcreply); 805 DTRACE_PROBE( 806 krpc__e__svcrdma__rpcmsg__reply__authwrap2); 807 return (SVC_RDMA_FAIL); 808 } 809 } 810 811 *len = XDR_GETPOS(*xdrs); 812 813 return (SVC_RDMA_SUCCESS); 814 } 815 816 /* 817 * Send rpc reply. 818 */ 819 static bool_t 820 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg) 821 { 822 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout); 823 XDR xdrs_rhdr; 824 CONN *conn = NULL; 825 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0}; 826 827 struct clone_rdma_data *crdp; 828 struct clist *cl_read = NULL; 829 struct clist *cl_send = NULL; 830 struct clist *cl_write = NULL; 831 xdrproc_t xdr_results; /* results XDR encoding function */ 832 caddr_t xdr_location; /* response results pointer */ 833 834 int retval = FALSE; 835 int status, msglen, num_wreply_segments = 0; 836 uint32_t rdma_credit = 0; 837 int freelen = 0; 838 bool_t has_args; 839 uint_t final_resp_len, rdma_response_op, vers; 840 841 bzero(&xdrs_rhdr, sizeof (XDR)); 842 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 843 conn = crdp->conn; 844 845 /* 846 * If there is a result procedure specified in the reply message, 847 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP. 848 * We need to make sure it won't be processed twice, so we null 849 * it for xdr_replymsg here. 850 */ 851 has_args = FALSE; 852 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 853 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 854 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) { 855 has_args = TRUE; 856 xdr_location = msg->acpted_rply.ar_results.where; 857 msg->acpted_rply.ar_results.proc = xdr_void; 858 msg->acpted_rply.ar_results.where = NULL; 859 } 860 } 861 862 /* 863 * Given the limit on the inline response size (RPC_MSG_SZ), 864 * there is a need to make a guess as to the overall size of 865 * the response. If the resultant size is beyond the inline 866 * size, then the server needs to use the "reply chunk list" 867 * provided by the client (if the client provided one). An 868 * example of this type of response would be a READDIR 869 * response (e.g. a small directory read would fit in RPC_MSG_SZ 870 * and that is the preference but it may not fit) 871 * 872 * Combine the encoded size and the size of the true results 873 * and then make the decision about where to encode and send results. 874 * 875 * One important note, this calculation is ignoring the size 876 * of the encoding of the authentication overhead. The reason 877 * for this is rooted in the complexities of access to the 878 * encoded size of RPCSEC_GSS related authentiation, 879 * integrity, and privacy. 880 * 881 * If it turns out that the encoded authentication bumps the 882 * response over the RPC_MSG_SZ limit, then it may need to 883 * attempt to encode for the reply chunk list. 884 */ 885 886 /* 887 * Calculating the "sizeof" the RPC response header and the 888 * encoded results. 889 */ 890 msglen = xdr_sizeof(xdr_replymsg, msg); 891 892 if (msglen > 0) { 893 RSSTAT_INCR(rstotalreplies); 894 } 895 if (has_args) 896 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 897 rdma_minchunk, NULL, NULL); 898 899 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen); 900 901 status = SVC_RDMA_SUCCESS; 902 903 if (msglen < RPC_MSG_SZ) { 904 /* 905 * Looks like the response will fit in the inline 906 * response; let's try 907 */ 908 RSSTAT_INCR(rstotalinlinereplies); 909 910 rdma_response_op = RDMA_MSG; 911 912 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results, 913 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg, 914 has_args, &final_resp_len); 915 916 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status, 917 int, status); 918 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len, 919 int, final_resp_len); 920 921 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) { 922 clist_free(crdp->cl_reply); 923 crdp->cl_reply = NULL; 924 } 925 } 926 927 /* 928 * If the encode failed (size?) or the message really is 929 * larger than what is allowed, try the response chunk list. 930 */ 931 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) { 932 /* 933 * attempting to use a reply chunk list when there 934 * isn't one won't get very far... 935 */ 936 if (crdp->cl_reply == NULL) { 937 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl); 938 goto out; 939 } 940 941 RSSTAT_INCR(rstotallongreplies); 942 943 msglen = xdr_sizeof(xdr_replymsg, msg); 944 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0, 945 NULL, NULL); 946 947 status = svc_process_long_reply(clone_xprt, xdr_results, 948 xdr_location, msg, has_args, &msglen, &freelen, 949 &num_wreply_segments, &final_resp_len); 950 951 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen, 952 int, final_resp_len); 953 954 if (status != SVC_RDMA_SUCCESS) { 955 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed); 956 goto out; 957 } 958 959 rdma_response_op = RDMA_NOMSG; 960 } 961 962 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len, 963 int, final_resp_len); 964 965 rbuf_resp.type = SEND_BUFFER; 966 if (rdma_buf_alloc(conn, &rbuf_resp)) { 967 rdma_buf_free(conn, &rbuf_rpc_resp); 968 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs); 969 goto out; 970 } 971 972 rdma_credit = rdma_bufs_granted; 973 974 vers = RPCRDMA_VERS; 975 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE); 976 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid; 977 /* Skip xid and set the xdr position accordingly. */ 978 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t)); 979 if (!xdr_u_int(&xdrs_rhdr, &vers) || 980 !xdr_u_int(&xdrs_rhdr, &rdma_credit) || 981 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) { 982 rdma_buf_free(conn, &rbuf_rpc_resp); 983 rdma_buf_free(conn, &rbuf_resp); 984 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint); 985 goto out; 986 } 987 988 /* 989 * Now XDR the read chunk list, actually always NULL 990 */ 991 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read); 992 993 /* 994 * encode write list -- we already drove RDMA_WRITEs 995 */ 996 cl_write = crdp->cl_wlist; 997 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) { 998 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist); 999 rdma_buf_free(conn, &rbuf_rpc_resp); 1000 rdma_buf_free(conn, &rbuf_resp); 1001 goto out; 1002 } 1003 1004 /* 1005 * XDR encode the RDMA_REPLY write chunk 1006 */ 1007 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply, 1008 num_wreply_segments)) { 1009 rdma_buf_free(conn, &rbuf_rpc_resp); 1010 rdma_buf_free(conn, &rbuf_resp); 1011 goto out; 1012 } 1013 1014 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle, 1015 rbuf_resp.addr, NULL, NULL); 1016 1017 if (rdma_response_op == RDMA_MSG) { 1018 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle, 1019 rbuf_rpc_resp.addr, NULL, NULL); 1020 } 1021 1022 status = RDMA_SEND(conn, cl_send, msg->rm_xid); 1023 1024 if (status == RDMA_SUCCESS) { 1025 retval = TRUE; 1026 } 1027 1028 out: 1029 /* 1030 * Free up sendlist chunks 1031 */ 1032 if (cl_send != NULL) 1033 clist_free(cl_send); 1034 1035 /* 1036 * Destroy private data for xdr rdma 1037 */ 1038 if (clone_xprt->xp_xdrout.x_ops != NULL) { 1039 XDR_DESTROY(&(clone_xprt->xp_xdrout)); 1040 } 1041 1042 if (crdp->cl_reply) { 1043 clist_free(crdp->cl_reply); 1044 crdp->cl_reply = NULL; 1045 } 1046 1047 /* 1048 * This is completely disgusting. If public is set it is 1049 * a pointer to a structure whose first field is the address 1050 * of the function to free that structure and any related 1051 * stuff. (see rrokfree in nfs_xdr.c). 1052 */ 1053 if (xdrs_rpc->x_public) { 1054 /* LINTED pointer alignment */ 1055 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public); 1056 } 1057 1058 if (xdrs_rhdr.x_ops != NULL) { 1059 XDR_DESTROY(&xdrs_rhdr); 1060 } 1061 1062 return (retval); 1063 } 1064 1065 /* 1066 * Deserialize arguments. 1067 */ 1068 static bool_t 1069 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr) 1070 { 1071 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin, 1072 xdr_args, args_ptr)) != TRUE) 1073 return (FALSE); 1074 return (TRUE); 1075 } 1076 1077 static bool_t 1078 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, 1079 caddr_t args_ptr) 1080 { 1081 struct clone_rdma_data *crdp; 1082 bool_t retval; 1083 1084 /* 1085 * If the cloned bit is true, then this transport specific 1086 * rmda data has been duplicated into another cloned xprt. Do 1087 * not free, or release the connection, it is still in use. The 1088 * buffers will be freed and the connection released later by 1089 * SVC_CLONE_DESTROY(). 1090 */ 1091 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 1092 if (crdp->cloned == TRUE) { 1093 crdp->cloned = 0; 1094 return (TRUE); 1095 } 1096 1097 /* 1098 * Free the args if needed then XDR_DESTROY 1099 */ 1100 if (args_ptr) { 1101 XDR *xdrs = &clone_xprt->xp_xdrin; 1102 1103 xdrs->x_op = XDR_FREE; 1104 retval = (*xdr_args)(xdrs, args_ptr); 1105 } 1106 1107 XDR_DESTROY(&(clone_xprt->xp_xdrin)); 1108 rdma_buf_free(crdp->conn, &crdp->rpcbuf); 1109 if (crdp->cl_reply) { 1110 clist_free(crdp->cl_reply); 1111 crdp->cl_reply = NULL; 1112 } 1113 RDMA_REL_CONN(crdp->conn); 1114 1115 return (retval); 1116 } 1117 1118 /* ARGSUSED */ 1119 static int32_t * 1120 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size) 1121 { 1122 return (NULL); 1123 } 1124 1125 /* ARGSUSED */ 1126 static void 1127 svc_rdma_kfreeres(SVCXPRT *clone_xprt) 1128 { 1129 } 1130 1131 /* 1132 * the dup cacheing routines below provide a cache of non-failure 1133 * transaction id's. rpc service routines can use this to detect 1134 * retransmissions and re-send a non-failure response. 1135 */ 1136 1137 /* 1138 * MAXDUPREQS is the number of cached items. It should be adjusted 1139 * to the service load so that there is likely to be a response entry 1140 * when the first retransmission comes in. 1141 */ 1142 #define MAXDUPREQS 1024 1143 1144 /* 1145 * This should be appropriately scaled to MAXDUPREQS. 1146 */ 1147 #define DRHASHSZ 257 1148 1149 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0) 1150 #define XIDHASH(xid) ((xid) & (DRHASHSZ - 1)) 1151 #else 1152 #define XIDHASH(xid) ((xid) % DRHASHSZ) 1153 #endif 1154 #define DRHASH(dr) XIDHASH((dr)->dr_xid) 1155 #define REQTOXID(req) ((req)->rq_xprt->xp_xid) 1156 1157 static int rdmandupreqs = 0; 1158 int rdmamaxdupreqs = MAXDUPREQS; 1159 static kmutex_t rdmadupreq_lock; 1160 static struct dupreq *rdmadrhashtbl[DRHASHSZ]; 1161 static int rdmadrhashstat[DRHASHSZ]; 1162 1163 static void unhash(struct dupreq *); 1164 1165 /* 1166 * rdmadrmru points to the head of a circular linked list in lru order. 1167 * rdmadrmru->dr_next == drlru 1168 */ 1169 struct dupreq *rdmadrmru; 1170 1171 /* 1172 * svc_rdma_kdup searches the request cache and returns 0 if the 1173 * request is not found in the cache. If it is found, then it 1174 * returns the state of the request (in progress or done) and 1175 * the status or attributes that were part of the original reply. 1176 */ 1177 static int 1178 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp, 1179 bool_t *dupcachedp) 1180 { 1181 struct dupreq *dr; 1182 uint32_t xid; 1183 uint32_t drhash; 1184 int status; 1185 1186 xid = REQTOXID(req); 1187 mutex_enter(&rdmadupreq_lock); 1188 RSSTAT_INCR(rsdupchecks); 1189 /* 1190 * Check to see whether an entry already exists in the cache. 1191 */ 1192 dr = rdmadrhashtbl[XIDHASH(xid)]; 1193 while (dr != NULL) { 1194 if (dr->dr_xid == xid && 1195 dr->dr_proc == req->rq_proc && 1196 dr->dr_prog == req->rq_prog && 1197 dr->dr_vers == req->rq_vers && 1198 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len && 1199 bcmp((caddr_t)dr->dr_addr.buf, 1200 (caddr_t)req->rq_xprt->xp_rtaddr.buf, 1201 dr->dr_addr.len) == 0) { 1202 status = dr->dr_status; 1203 if (status == DUP_DONE) { 1204 bcopy(dr->dr_resp.buf, res, size); 1205 if (dupcachedp != NULL) 1206 *dupcachedp = (dr->dr_resfree != NULL); 1207 } else { 1208 dr->dr_status = DUP_INPROGRESS; 1209 *drpp = dr; 1210 } 1211 RSSTAT_INCR(rsdupreqs); 1212 mutex_exit(&rdmadupreq_lock); 1213 return (status); 1214 } 1215 dr = dr->dr_chain; 1216 } 1217 1218 /* 1219 * There wasn't an entry, either allocate a new one or recycle 1220 * an old one. 1221 */ 1222 if (rdmandupreqs < rdmamaxdupreqs) { 1223 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP); 1224 if (dr == NULL) { 1225 mutex_exit(&rdmadupreq_lock); 1226 return (DUP_ERROR); 1227 } 1228 dr->dr_resp.buf = NULL; 1229 dr->dr_resp.maxlen = 0; 1230 dr->dr_addr.buf = NULL; 1231 dr->dr_addr.maxlen = 0; 1232 if (rdmadrmru) { 1233 dr->dr_next = rdmadrmru->dr_next; 1234 rdmadrmru->dr_next = dr; 1235 } else { 1236 dr->dr_next = dr; 1237 } 1238 rdmandupreqs++; 1239 } else { 1240 dr = rdmadrmru->dr_next; 1241 while (dr->dr_status == DUP_INPROGRESS) { 1242 dr = dr->dr_next; 1243 if (dr == rdmadrmru->dr_next) { 1244 mutex_exit(&rdmadupreq_lock); 1245 return (DUP_ERROR); 1246 } 1247 } 1248 unhash(dr); 1249 if (dr->dr_resfree) { 1250 (*dr->dr_resfree)(dr->dr_resp.buf); 1251 } 1252 } 1253 dr->dr_resfree = NULL; 1254 rdmadrmru = dr; 1255 1256 dr->dr_xid = REQTOXID(req); 1257 dr->dr_prog = req->rq_prog; 1258 dr->dr_vers = req->rq_vers; 1259 dr->dr_proc = req->rq_proc; 1260 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { 1261 if (dr->dr_addr.buf != NULL) 1262 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); 1263 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; 1264 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); 1265 if (dr->dr_addr.buf == NULL) { 1266 dr->dr_addr.maxlen = 0; 1267 dr->dr_status = DUP_DROP; 1268 mutex_exit(&rdmadupreq_lock); 1269 return (DUP_ERROR); 1270 } 1271 } 1272 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; 1273 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); 1274 if (dr->dr_resp.maxlen < size) { 1275 if (dr->dr_resp.buf != NULL) 1276 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); 1277 dr->dr_resp.maxlen = (unsigned int)size; 1278 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP); 1279 if (dr->dr_resp.buf == NULL) { 1280 dr->dr_resp.maxlen = 0; 1281 dr->dr_status = DUP_DROP; 1282 mutex_exit(&rdmadupreq_lock); 1283 return (DUP_ERROR); 1284 } 1285 } 1286 dr->dr_status = DUP_INPROGRESS; 1287 1288 drhash = (uint32_t)DRHASH(dr); 1289 dr->dr_chain = rdmadrhashtbl[drhash]; 1290 rdmadrhashtbl[drhash] = dr; 1291 rdmadrhashstat[drhash]++; 1292 mutex_exit(&rdmadupreq_lock); 1293 *drpp = dr; 1294 return (DUP_NEW); 1295 } 1296 1297 /* 1298 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP) 1299 * and stores the response. 1300 */ 1301 static void 1302 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), 1303 int size, int status) 1304 { 1305 ASSERT(dr->dr_resfree == NULL); 1306 if (status == DUP_DONE) { 1307 bcopy(res, dr->dr_resp.buf, size); 1308 dr->dr_resfree = dis_resfree; 1309 } 1310 dr->dr_status = status; 1311 } 1312 1313 /* 1314 * This routine expects that the mutex, rdmadupreq_lock, is already held. 1315 */ 1316 static void 1317 unhash(struct dupreq *dr) 1318 { 1319 struct dupreq *drt; 1320 struct dupreq *drtprev = NULL; 1321 uint32_t drhash; 1322 1323 ASSERT(MUTEX_HELD(&rdmadupreq_lock)); 1324 1325 drhash = (uint32_t)DRHASH(dr); 1326 drt = rdmadrhashtbl[drhash]; 1327 while (drt != NULL) { 1328 if (drt == dr) { 1329 rdmadrhashstat[drhash]--; 1330 if (drtprev == NULL) { 1331 rdmadrhashtbl[drhash] = drt->dr_chain; 1332 } else { 1333 drtprev->dr_chain = drt->dr_chain; 1334 } 1335 return; 1336 } 1337 drtprev = drt; 1338 drt = drt->dr_chain; 1339 } 1340 } 1341 1342 bool_t 1343 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist) 1344 { 1345 struct clist *clist; 1346 uint32_t tlen; 1347 1348 if (req->rq_xprt->xp_type != T_RDMA) { 1349 return (FALSE); 1350 } 1351 1352 tlen = 0; 1353 clist = wlist; 1354 while (clist) { 1355 tlen += clist->c_len; 1356 clist = clist->c_next; 1357 } 1358 1359 /* 1360 * set iov to addr+len of first segment of first wchunk of 1361 * wlist sent by client. krecv() already malloc'd a buffer 1362 * large enough, but registration is deferred until we write 1363 * the buffer back to (NFS) client using RDMA_WRITE. 1364 */ 1365 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr; 1366 iov->iov_len = tlen; 1367 1368 return (TRUE); 1369 } 1370 1371 /* 1372 * routine to setup the read chunk lists 1373 */ 1374 1375 int 1376 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len) 1377 { 1378 int data_len, avail_len; 1379 uint_t round_len; 1380 1381 data_len = avail_len = 0; 1382 1383 while (wcl != NULL && count > 0) { 1384 if (wcl->c_dmemhandle.mrc_rmr == 0) 1385 break; 1386 1387 if (wcl->c_len < count) { 1388 data_len += wcl->c_len; 1389 avail_len = 0; 1390 } else { 1391 data_len += count; 1392 avail_len = wcl->c_len - count; 1393 wcl->c_len = count; 1394 } 1395 count -= wcl->c_len; 1396 1397 if (count == 0) 1398 break; 1399 1400 wcl = wcl->c_next; 1401 } 1402 1403 /* 1404 * MUST fail if there are still more data 1405 */ 1406 if (count > 0) { 1407 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len, 1408 int, data_len, int, count); 1409 return (FALSE); 1410 } 1411 1412 /* 1413 * Round up the last chunk to 4-byte boundary 1414 */ 1415 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT); 1416 round_len = *wcl_len - data_len; 1417 1418 if (round_len) { 1419 1420 /* 1421 * If there is space in the current chunk, 1422 * add the roundup to the chunk. 1423 */ 1424 if (avail_len >= round_len) { 1425 wcl->c_len += round_len; 1426 } else { 1427 /* 1428 * try the next one. 1429 */ 1430 wcl = wcl->c_next; 1431 if ((wcl == NULL) || (wcl->c_len < round_len)) { 1432 DTRACE_PROBE1( 1433 krpc__e__rdma_setup_read_chunks_rndup, 1434 int, round_len); 1435 return (FALSE); 1436 } 1437 wcl->c_len = round_len; 1438 } 1439 } 1440 1441 wcl = wcl->c_next; 1442 1443 /* 1444 * Make rest of the chunks 0-len 1445 */ 1446 1447 clist_zero_len(wcl); 1448 1449 return (TRUE); 1450 } 1451