1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 26 /* All Rights Reserved */ 27 /* 28 * Portions of this source code were derived from Berkeley 29 * 4.3 BSD under license from the Regents of the University of 30 * California. 31 */ 32 33 /* 34 * Server side of RPC over RDMA in the kernel. 35 */ 36 37 #include <sys/param.h> 38 #include <sys/types.h> 39 #include <sys/user.h> 40 #include <sys/sysmacros.h> 41 #include <sys/proc.h> 42 #include <sys/file.h> 43 #include <sys/errno.h> 44 #include <sys/kmem.h> 45 #include <sys/debug.h> 46 #include <sys/systm.h> 47 #include <sys/cmn_err.h> 48 #include <sys/kstat.h> 49 #include <sys/vtrace.h> 50 #include <sys/debug.h> 51 52 #include <rpc/types.h> 53 #include <rpc/xdr.h> 54 #include <rpc/auth.h> 55 #include <rpc/clnt.h> 56 #include <rpc/rpc_msg.h> 57 #include <rpc/svc.h> 58 #include <rpc/rpc_rdma.h> 59 #include <sys/ddi.h> 60 #include <sys/sunddi.h> 61 62 #include <inet/common.h> 63 #include <inet/ip.h> 64 #include <inet/ip6.h> 65 66 #include <nfs/nfs.h> 67 #include <sys/sdt.h> 68 69 #define SVC_RDMA_SUCCESS 0 70 #define SVC_RDMA_FAIL -1 71 72 #define SVC_CREDIT_FACTOR (0.5) 73 74 #define MSG_IS_RPCSEC_GSS(msg) \ 75 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) 76 77 78 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT; 79 80 /* 81 * RDMA transport specific data associated with SVCMASTERXPRT 82 */ 83 struct rdma_data { 84 SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */ 85 struct rdma_svc_data rd_data; /* rdma data */ 86 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */ 87 }; 88 89 /* 90 * Plugin connection specific data stashed away in clone SVCXPRT 91 */ 92 struct clone_rdma_data { 93 bool_t cloned; /* xprt cloned for thread processing */ 94 CONN *conn; /* RDMA connection */ 95 rdma_buf_t rpcbuf; /* RPC req/resp buffer */ 96 struct clist *cl_reply; /* reply chunk buffer info */ 97 struct clist *cl_wlist; /* write list clist */ 98 }; 99 100 101 #define MAXADDRLEN 128 /* max length for address mask */ 102 103 /* 104 * Routines exported through ops vector. 105 */ 106 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *); 107 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *); 108 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t); 109 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t); 110 void svc_rdma_kdestroy(SVCMASTERXPRT *); 111 static int svc_rdma_kdup(struct svc_req *, caddr_t, int, 112 struct dupreq **, bool_t *); 113 static void svc_rdma_kdupdone(struct dupreq *, caddr_t, 114 void (*)(), int, int); 115 static int32_t *svc_rdma_kgetres(SVCXPRT *, int); 116 static void svc_rdma_kfreeres(SVCXPRT *); 117 static void svc_rdma_kclone_destroy(SVCXPRT *); 118 static void svc_rdma_kstart(SVCMASTERXPRT *); 119 void svc_rdma_kstop(SVCMASTERXPRT *); 120 static void svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *); 121 122 static int svc_process_long_reply(SVCXPRT *, xdrproc_t, 123 caddr_t, struct rpc_msg *, bool_t, int *, 124 int *, int *, unsigned int *); 125 126 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t, 127 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *, 128 bool_t, uint_t *); 129 static bool_t rpcmsg_length(xdrproc_t, 130 caddr_t, 131 struct rpc_msg *, bool_t, int); 132 133 /* 134 * Server transport operations vector. 135 */ 136 struct svc_ops rdma_svc_ops = { 137 svc_rdma_krecv, /* Get requests */ 138 svc_rdma_kgetargs, /* Deserialize arguments */ 139 svc_rdma_ksend, /* Send reply */ 140 svc_rdma_kfreeargs, /* Free argument data space */ 141 svc_rdma_kdestroy, /* Destroy transport handle */ 142 svc_rdma_kdup, /* Check entry in dup req cache */ 143 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */ 144 svc_rdma_kgetres, /* Get pointer to response buffer */ 145 svc_rdma_kfreeres, /* Destroy pre-serialized response header */ 146 svc_rdma_kclone_destroy, /* Destroy a clone xprt */ 147 svc_rdma_kstart, /* Tell `ready-to-receive' to rpcmod */ 148 svc_rdma_kclone_xprt /* Transport specific clone xprt */ 149 }; 150 151 /* 152 * Server statistics 153 * NOTE: This structure type is duplicated in the NFS fast path. 154 */ 155 struct { 156 kstat_named_t rscalls; 157 kstat_named_t rsbadcalls; 158 kstat_named_t rsnullrecv; 159 kstat_named_t rsbadlen; 160 kstat_named_t rsxdrcall; 161 kstat_named_t rsdupchecks; 162 kstat_named_t rsdupreqs; 163 kstat_named_t rslongrpcs; 164 kstat_named_t rstotalreplies; 165 kstat_named_t rstotallongreplies; 166 kstat_named_t rstotalinlinereplies; 167 } rdmarsstat = { 168 { "calls", KSTAT_DATA_UINT64 }, 169 { "badcalls", KSTAT_DATA_UINT64 }, 170 { "nullrecv", KSTAT_DATA_UINT64 }, 171 { "badlen", KSTAT_DATA_UINT64 }, 172 { "xdrcall", KSTAT_DATA_UINT64 }, 173 { "dupchecks", KSTAT_DATA_UINT64 }, 174 { "dupreqs", KSTAT_DATA_UINT64 }, 175 { "longrpcs", KSTAT_DATA_UINT64 }, 176 { "totalreplies", KSTAT_DATA_UINT64 }, 177 { "totallongreplies", KSTAT_DATA_UINT64 }, 178 { "totalinlinereplies", KSTAT_DATA_UINT64 }, 179 }; 180 181 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat; 182 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t); 183 184 #define RSSTAT_INCR(x) atomic_add_64(&rdmarsstat.x.value.ui64, 1) 185 /* 186 * Create a transport record. 187 * The transport record, output buffer, and private data structure 188 * are allocated. The output buffer is serialized into using xdrmem. 189 * There is one transport record per user process which implements a 190 * set of services. 191 */ 192 /* ARGSUSED */ 193 int 194 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, 195 rdma_xprt_group_t *started_xprts) 196 { 197 int error; 198 SVCMASTERXPRT *xprt; 199 struct rdma_data *rd; 200 rdma_registry_t *rmod; 201 rdma_xprt_record_t *xprt_rec; 202 queue_t *q; 203 /* 204 * modload the RDMA plugins is not already done. 205 */ 206 if (!rdma_modloaded) { 207 /*CONSTANTCONDITION*/ 208 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN); 209 210 mutex_enter(&rdma_modload_lock); 211 if (!rdma_modloaded) { 212 error = rdma_modload(); 213 } 214 mutex_exit(&rdma_modload_lock); 215 216 if (error) 217 return (error); 218 } 219 220 /* 221 * master_xprt_count is the count of master transport handles 222 * that were successfully created and are ready to recieve for 223 * RDMA based access. 224 */ 225 error = 0; 226 xprt_rec = NULL; 227 rw_enter(&rdma_lock, RW_READER); 228 if (rdma_mod_head == NULL) { 229 started_xprts->rtg_count = 0; 230 rw_exit(&rdma_lock); 231 if (rdma_dev_available) 232 return (EPROTONOSUPPORT); 233 else 234 return (ENODEV); 235 } 236 237 /* 238 * If we have reached here, then atleast one RDMA plugin has loaded. 239 * Create a master_xprt, make it start listenining on the device, 240 * if an error is generated, record it, we might need to shut 241 * the master_xprt. 242 * SVC_START() calls svc_rdma_kstart which calls plugin binding 243 * routines. 244 */ 245 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) { 246 247 /* 248 * One SVCMASTERXPRT per RDMA plugin. 249 */ 250 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP); 251 xprt->xp_ops = &rdma_svc_ops; 252 xprt->xp_sct = sct; 253 xprt->xp_type = T_RDMA; 254 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); 255 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); 256 xprt->xp_req_head = (mblk_t *)0; 257 xprt->xp_req_tail = (mblk_t *)0; 258 xprt->xp_threads = 0; 259 xprt->xp_detached_threads = 0; 260 261 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP); 262 xprt->xp_p2 = (caddr_t)rd; 263 rd->rd_xprt = xprt; 264 rd->r_mod = rmod->r_mod; 265 266 q = &rd->rd_data.q; 267 xprt->xp_wq = q; 268 q->q_ptr = &rd->rd_xprt; 269 xprt->xp_netid = NULL; 270 271 xprt->xp_addrmask.maxlen = 272 xprt->xp_addrmask.len = sizeof (struct sockaddr_in); 273 xprt->xp_addrmask.buf = 274 kmem_zalloc(xprt->xp_addrmask.len, KM_SLEEP); 275 ((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_addr.s_addr = 276 (uint32_t)~0; 277 ((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_family = 278 (ushort_t)~0; 279 280 /* 281 * Each of the plugins will have their own Service ID 282 * to listener specific mapping, like port number for VI 283 * and service name for IB. 284 */ 285 rd->rd_data.svcid = id; 286 error = svc_xprt_register(xprt, id); 287 if (error) { 288 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg); 289 goto cleanup; 290 } 291 292 SVC_START(xprt); 293 if (!rd->rd_data.active) { 294 svc_xprt_unregister(xprt); 295 error = rd->rd_data.err_code; 296 goto cleanup; 297 } 298 299 /* 300 * This is set only when there is atleast one or more 301 * transports successfully created. We insert the pointer 302 * to the created RDMA master xprt into a separately maintained 303 * list. This way we can easily reference it later to cleanup, 304 * when NFS kRPC service pool is going away/unregistered. 305 */ 306 started_xprts->rtg_count ++; 307 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP); 308 xprt_rec->rtr_xprt_ptr = xprt; 309 xprt_rec->rtr_next = started_xprts->rtg_listhead; 310 started_xprts->rtg_listhead = xprt_rec; 311 continue; 312 cleanup: 313 SVC_DESTROY(xprt); 314 if (error == RDMA_FAILED) 315 error = EPROTONOSUPPORT; 316 } 317 318 rw_exit(&rdma_lock); 319 320 /* 321 * Don't return any error even if a single plugin was started 322 * successfully. 323 */ 324 if (started_xprts->rtg_count == 0) 325 return (error); 326 return (0); 327 } 328 329 /* 330 * Cleanup routine for freeing up memory allocated by 331 * svc_rdma_kcreate() 332 */ 333 void 334 svc_rdma_kdestroy(SVCMASTERXPRT *xprt) 335 { 336 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2; 337 338 339 mutex_destroy(&xprt->xp_req_lock); 340 mutex_destroy(&xprt->xp_thread_lock); 341 kmem_free(rd, sizeof (*rd)); 342 kmem_free(xprt->xp_addrmask.buf, xprt->xp_addrmask.maxlen); 343 kmem_free(xprt, sizeof (*xprt)); 344 } 345 346 347 static void 348 svc_rdma_kstart(SVCMASTERXPRT *xprt) 349 { 350 struct rdma_svc_data *svcdata; 351 rdma_mod_t *rmod; 352 353 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 354 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 355 356 /* 357 * Create a listener for module at this port 358 */ 359 360 if (rmod->rdma_count != 0) 361 (*rmod->rdma_ops->rdma_svc_listen)(svcdata); 362 else 363 svcdata->err_code = RDMA_FAILED; 364 } 365 366 void 367 svc_rdma_kstop(SVCMASTERXPRT *xprt) 368 { 369 struct rdma_svc_data *svcdata; 370 rdma_mod_t *rmod; 371 372 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 373 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 374 375 /* 376 * Call the stop listener routine for each plugin. If rdma_count is 377 * already zero set active to zero. 378 */ 379 if (rmod->rdma_count != 0) 380 (*rmod->rdma_ops->rdma_svc_stop)(svcdata); 381 else 382 svcdata->active = 0; 383 if (svcdata->active) 384 DTRACE_PROBE(krpc__e__svcrdma__kstop); 385 } 386 387 /* ARGSUSED */ 388 static void 389 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt) 390 { 391 392 struct clone_rdma_data *cdrp; 393 cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 394 395 /* 396 * Only free buffers and release connection when cloned is set. 397 */ 398 if (cdrp->cloned != TRUE) 399 return; 400 401 rdma_buf_free(cdrp->conn, &cdrp->rpcbuf); 402 if (cdrp->cl_reply) { 403 clist_free(cdrp->cl_reply); 404 cdrp->cl_reply = NULL; 405 } 406 RDMA_REL_CONN(cdrp->conn); 407 408 cdrp->cloned = 0; 409 } 410 411 /* 412 * Clone the xprt specific information. It will be freed by 413 * SVC_CLONE_DESTROY. 414 */ 415 static void 416 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt) 417 { 418 struct clone_rdma_data *srcp2; 419 struct clone_rdma_data *dstp2; 420 421 srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf; 422 dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf; 423 424 if (srcp2->conn != NULL) { 425 srcp2->cloned = TRUE; 426 *dstp2 = *srcp2; 427 } 428 } 429 430 431 static bool_t 432 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg) 433 { 434 XDR *xdrs; 435 CONN *conn; 436 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; 437 struct clone_rdma_data *crdp; 438 struct clist *cl = NULL; 439 struct clist *wcl = NULL; 440 struct clist *cllong = NULL; 441 442 rdma_stat status; 443 uint32_t vers, op, pos, xid; 444 uint32_t rdma_credit; 445 uint32_t wcl_total_length = 0; 446 bool_t wwl = FALSE; 447 448 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 449 RSSTAT_INCR(rscalls); 450 conn = rdp->conn; 451 452 status = rdma_svc_postrecv(conn); 453 if (status != RDMA_SUCCESS) { 454 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv); 455 goto badrpc_call; 456 } 457 458 xdrs = &clone_xprt->xp_xdrin; 459 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE); 460 xid = *(uint32_t *)rdp->rpcmsg.addr; 461 XDR_SETPOS(xdrs, sizeof (uint32_t)); 462 463 if (! xdr_u_int(xdrs, &vers) || 464 ! xdr_u_int(xdrs, &rdma_credit) || 465 ! xdr_u_int(xdrs, &op)) { 466 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint); 467 goto xdr_err; 468 } 469 470 /* Checking if the status of the recv operation was normal */ 471 if (rdp->status != 0) { 472 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status, 473 int, rdp->status); 474 goto badrpc_call; 475 } 476 477 if (! xdr_do_clist(xdrs, &cl)) { 478 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist); 479 goto xdr_err; 480 } 481 482 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) { 483 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist); 484 if (cl) 485 clist_free(cl); 486 goto xdr_err; 487 } 488 crdp->cl_wlist = wcl; 489 490 crdp->cl_reply = NULL; 491 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply); 492 493 /* 494 * A chunk at 0 offset indicates that the RPC call message 495 * is in a chunk. Get the RPC call message chunk. 496 */ 497 if (cl != NULL && op == RDMA_NOMSG) { 498 499 /* Remove RPC call message chunk from chunklist */ 500 cllong = cl; 501 cl = cl->c_next; 502 cllong->c_next = NULL; 503 504 505 /* Allocate and register memory for the RPC call msg chunk */ 506 cllong->rb_longbuf.type = RDMA_LONG_BUFFER; 507 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ? 508 cllong->c_len : LONG_REPLY_LEN; 509 510 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) { 511 clist_free(cllong); 512 goto cll_malloc_err; 513 } 514 515 cllong->u.c_daddr3 = cllong->rb_longbuf.addr; 516 517 if (cllong->u.c_daddr == NULL) { 518 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem); 519 rdma_buf_free(conn, &cllong->rb_longbuf); 520 clist_free(cllong); 521 goto cll_malloc_err; 522 } 523 524 status = clist_register(conn, cllong, CLIST_REG_DST); 525 if (status) { 526 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg); 527 rdma_buf_free(conn, &cllong->rb_longbuf); 528 clist_free(cllong); 529 goto cll_malloc_err; 530 } 531 532 /* 533 * Now read the RPC call message in 534 */ 535 status = RDMA_READ(conn, cllong, WAIT); 536 if (status) { 537 DTRACE_PROBE(krpc__e__svcrdma__krecv__read); 538 (void) clist_deregister(conn, cllong); 539 rdma_buf_free(conn, &cllong->rb_longbuf); 540 clist_free(cllong); 541 goto cll_malloc_err; 542 } 543 544 status = clist_syncmem(conn, cllong, CLIST_REG_DST); 545 (void) clist_deregister(conn, cllong); 546 547 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3, 548 cllong->c_len, 0, cl, XDR_DECODE, conn); 549 550 crdp->rpcbuf = cllong->rb_longbuf; 551 crdp->rpcbuf.len = cllong->c_len; 552 clist_free(cllong); 553 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 554 } else { 555 pos = XDR_GETPOS(xdrs); 556 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos, 557 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn); 558 crdp->rpcbuf = rdp->rpcmsg; 559 560 /* Use xdrrdmablk_ops to indicate there is a read chunk list */ 561 if (cl != NULL) { 562 int32_t flg = XDR_RDMA_RLIST_REG; 563 564 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 565 xdrs->x_ops = &xdrrdmablk_ops; 566 } 567 } 568 569 if (crdp->cl_wlist) { 570 int32_t flg = XDR_RDMA_WLIST_REG; 571 572 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist); 573 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 574 } 575 576 if (! xdr_callmsg(xdrs, msg)) { 577 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg); 578 RSSTAT_INCR(rsxdrcall); 579 goto callmsg_err; 580 } 581 582 /* 583 * Point the remote transport address in the service_transport 584 * handle at the address in the request. 585 */ 586 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf; 587 clone_xprt->xp_rtaddr.len = conn->c_raddr.len; 588 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len; 589 590 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf; 591 clone_xprt->xp_lcladdr.len = conn->c_laddr.len; 592 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len; 593 594 /* 595 * In case of RDMA, connection management is 596 * entirely done in rpcib module and netid in the 597 * SVCMASTERXPRT is NULL. Initialize the clone netid 598 * from the connection. 599 */ 600 601 clone_xprt->xp_netid = conn->c_netid; 602 603 clone_xprt->xp_xid = xid; 604 crdp->conn = conn; 605 606 freeb(mp); 607 608 return (TRUE); 609 610 callmsg_err: 611 rdma_buf_free(conn, &crdp->rpcbuf); 612 613 cll_malloc_err: 614 if (cl) 615 clist_free(cl); 616 xdr_err: 617 XDR_DESTROY(xdrs); 618 619 badrpc_call: 620 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 621 RDMA_REL_CONN(conn); 622 freeb(mp); 623 RSSTAT_INCR(rsbadcalls); 624 return (FALSE); 625 } 626 627 static int 628 svc_process_long_reply(SVCXPRT * clone_xprt, 629 xdrproc_t xdr_results, caddr_t xdr_location, 630 struct rpc_msg *msg, bool_t has_args, int *msglen, 631 int *freelen, int *numchunks, unsigned int *final_len) 632 { 633 int status; 634 XDR xdrslong; 635 struct clist *wcl = NULL; 636 int count = 0; 637 int alloc_len; 638 char *memp; 639 rdma_buf_t long_rpc = {0}; 640 struct clone_rdma_data *crdp; 641 642 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 643 644 bzero(&xdrslong, sizeof (xdrslong)); 645 646 /* Choose a size for the long rpc response */ 647 if (MSG_IS_RPCSEC_GSS(msg)) { 648 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen); 649 } else { 650 alloc_len = RNDUP(*msglen); 651 } 652 653 if (alloc_len <= 64 * 1024) { 654 if (alloc_len > 32 * 1024) { 655 alloc_len = 64 * 1024; 656 } else { 657 if (alloc_len > 16 * 1024) { 658 alloc_len = 32 * 1024; 659 } else { 660 alloc_len = 16 * 1024; 661 } 662 } 663 } 664 665 long_rpc.type = RDMA_LONG_BUFFER; 666 long_rpc.len = alloc_len; 667 if (rdma_buf_alloc(crdp->conn, &long_rpc)) { 668 return (SVC_RDMA_FAIL); 669 } 670 671 memp = long_rpc.addr; 672 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE); 673 674 msg->rm_xid = clone_xprt->xp_xid; 675 676 if (!(xdr_replymsg(&xdrslong, msg) && 677 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong, 678 xdr_results, xdr_location)))) { 679 rdma_buf_free(crdp->conn, &long_rpc); 680 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap); 681 return (SVC_RDMA_FAIL); 682 } 683 684 *final_len = XDR_GETPOS(&xdrslong); 685 686 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len); 687 *numchunks = 0; 688 *freelen = 0; 689 690 wcl = crdp->cl_reply; 691 wcl->rb_longbuf = long_rpc; 692 693 count = *final_len; 694 while ((wcl != NULL) && (count > 0)) { 695 696 if (wcl->c_dmemhandle.mrc_rmr == 0) 697 break; 698 699 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count, 700 uint32_t, wcl->c_len); 701 702 if (wcl->c_len > count) { 703 wcl->c_len = count; 704 } 705 wcl->w.c_saddr3 = (caddr_t)memp; 706 707 count -= wcl->c_len; 708 *numchunks += 1; 709 memp += wcl->c_len; 710 wcl = wcl->c_next; 711 } 712 713 /* 714 * Make rest of the chunks 0-len 715 */ 716 while (wcl != NULL) { 717 if (wcl->c_dmemhandle.mrc_rmr == 0) 718 break; 719 wcl->c_len = 0; 720 wcl = wcl->c_next; 721 } 722 723 wcl = crdp->cl_reply; 724 725 /* 726 * MUST fail if there are still more data 727 */ 728 if (count > 0) { 729 rdma_buf_free(crdp->conn, &long_rpc); 730 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist); 731 return (SVC_RDMA_FAIL); 732 } 733 734 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) { 735 rdma_buf_free(crdp->conn, &long_rpc); 736 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg); 737 return (SVC_RDMA_FAIL); 738 } 739 740 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE); 741 742 if (status) { 743 (void) clist_deregister(crdp->conn, wcl); 744 rdma_buf_free(crdp->conn, &long_rpc); 745 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem); 746 return (SVC_RDMA_FAIL); 747 } 748 749 status = RDMA_WRITE(crdp->conn, wcl, WAIT); 750 751 (void) clist_deregister(crdp->conn, wcl); 752 rdma_buf_free(crdp->conn, &wcl->rb_longbuf); 753 754 if (status != RDMA_SUCCESS) { 755 DTRACE_PROBE(krpc__e__svcrdma__longrep__write); 756 return (SVC_RDMA_FAIL); 757 } 758 759 return (SVC_RDMA_SUCCESS); 760 } 761 762 763 static int 764 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results, 765 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs, 766 struct rpc_msg *msg, bool_t has_args, uint_t *len) 767 { 768 /* 769 * Get a pre-allocated buffer for rpc reply 770 */ 771 rpcreply->type = SEND_BUFFER; 772 if (rdma_buf_alloc(conn, rpcreply)) { 773 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs); 774 return (SVC_RDMA_FAIL); 775 } 776 777 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len, 778 0, NULL, XDR_ENCODE, conn); 779 780 msg->rm_xid = clone_xprt->xp_xid; 781 782 if (has_args) { 783 if (!(xdr_replymsg(*xdrs, msg) && 784 (!has_args || 785 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs, 786 xdr_results, xdr_location)))) { 787 rdma_buf_free(conn, rpcreply); 788 DTRACE_PROBE( 789 krpc__e__svcrdma__rpcmsg__reply__authwrap1); 790 return (SVC_RDMA_FAIL); 791 } 792 } else { 793 if (!xdr_replymsg(*xdrs, msg)) { 794 rdma_buf_free(conn, rpcreply); 795 DTRACE_PROBE( 796 krpc__e__svcrdma__rpcmsg__reply__authwrap2); 797 return (SVC_RDMA_FAIL); 798 } 799 } 800 801 *len = XDR_GETPOS(*xdrs); 802 803 return (SVC_RDMA_SUCCESS); 804 } 805 806 /* 807 * Send rpc reply. 808 */ 809 static bool_t 810 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg) 811 { 812 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout); 813 XDR xdrs_rhdr; 814 CONN *conn = NULL; 815 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0}; 816 817 struct clone_rdma_data *crdp; 818 struct clist *cl_read = NULL; 819 struct clist *cl_send = NULL; 820 struct clist *cl_write = NULL; 821 xdrproc_t xdr_results; /* results XDR encoding function */ 822 caddr_t xdr_location; /* response results pointer */ 823 824 int retval = FALSE; 825 int status, msglen, num_wreply_segments = 0; 826 uint32_t rdma_credit = 0; 827 int freelen = 0; 828 bool_t has_args; 829 uint_t final_resp_len, rdma_response_op, vers; 830 831 bzero(&xdrs_rhdr, sizeof (XDR)); 832 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 833 conn = crdp->conn; 834 835 /* 836 * If there is a result procedure specified in the reply message, 837 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP. 838 * We need to make sure it won't be processed twice, so we null 839 * it for xdr_replymsg here. 840 */ 841 has_args = FALSE; 842 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 843 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 844 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) { 845 has_args = TRUE; 846 xdr_location = msg->acpted_rply.ar_results.where; 847 msg->acpted_rply.ar_results.proc = xdr_void; 848 msg->acpted_rply.ar_results.where = NULL; 849 } 850 } 851 852 /* 853 * Given the limit on the inline response size (RPC_MSG_SZ), 854 * there is a need to make a guess as to the overall size of 855 * the response. If the resultant size is beyond the inline 856 * size, then the server needs to use the "reply chunk list" 857 * provided by the client (if the client provided one). An 858 * example of this type of response would be a READDIR 859 * response (e.g. a small directory read would fit in RPC_MSG_SZ 860 * and that is the preference but it may not fit) 861 * 862 * Combine the encoded size and the size of the true results 863 * and then make the decision about where to encode and send results. 864 * 865 * One important note, this calculation is ignoring the size 866 * of the encoding of the authentication overhead. The reason 867 * for this is rooted in the complexities of access to the 868 * encoded size of RPCSEC_GSS related authentiation, 869 * integrity, and privacy. 870 * 871 * If it turns out that the encoded authentication bumps the 872 * response over the RPC_MSG_SZ limit, then it may need to 873 * attempt to encode for the reply chunk list. 874 */ 875 876 /* 877 * Calculating the "sizeof" the RPC response header and the 878 * encoded results. 879 */ 880 msglen = xdr_sizeof(xdr_replymsg, msg); 881 882 if (msglen > 0) { 883 RSSTAT_INCR(rstotalreplies); 884 } 885 if (has_args) 886 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 887 rdma_minchunk, NULL, NULL); 888 889 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen); 890 891 status = SVC_RDMA_SUCCESS; 892 893 if (msglen < RPC_MSG_SZ) { 894 /* 895 * Looks like the response will fit in the inline 896 * response; let's try 897 */ 898 RSSTAT_INCR(rstotalinlinereplies); 899 900 rdma_response_op = RDMA_MSG; 901 902 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results, 903 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg, 904 has_args, &final_resp_len); 905 906 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status, 907 int, status); 908 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len, 909 int, final_resp_len); 910 911 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) { 912 clist_free(crdp->cl_reply); 913 crdp->cl_reply = NULL; 914 } 915 } 916 917 /* 918 * If the encode failed (size?) or the message really is 919 * larger than what is allowed, try the response chunk list. 920 */ 921 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) { 922 /* 923 * attempting to use a reply chunk list when there 924 * isn't one won't get very far... 925 */ 926 if (crdp->cl_reply == NULL) { 927 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl); 928 goto out; 929 } 930 931 RSSTAT_INCR(rstotallongreplies); 932 933 msglen = xdr_sizeof(xdr_replymsg, msg); 934 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0, 935 NULL, NULL); 936 937 status = svc_process_long_reply(clone_xprt, xdr_results, 938 xdr_location, msg, has_args, &msglen, &freelen, 939 &num_wreply_segments, &final_resp_len); 940 941 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen, 942 int, final_resp_len); 943 944 if (status != SVC_RDMA_SUCCESS) { 945 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed); 946 goto out; 947 } 948 949 rdma_response_op = RDMA_NOMSG; 950 } 951 952 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len, 953 int, final_resp_len); 954 955 rbuf_resp.type = SEND_BUFFER; 956 if (rdma_buf_alloc(conn, &rbuf_resp)) { 957 rdma_buf_free(conn, &rbuf_rpc_resp); 958 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs); 959 goto out; 960 } 961 962 rdma_credit = rdma_bufs_granted; 963 964 vers = RPCRDMA_VERS; 965 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE); 966 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid; 967 /* Skip xid and set the xdr position accordingly. */ 968 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t)); 969 if (!xdr_u_int(&xdrs_rhdr, &vers) || 970 !xdr_u_int(&xdrs_rhdr, &rdma_credit) || 971 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) { 972 rdma_buf_free(conn, &rbuf_rpc_resp); 973 rdma_buf_free(conn, &rbuf_resp); 974 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint); 975 goto out; 976 } 977 978 /* 979 * Now XDR the read chunk list, actually always NULL 980 */ 981 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read); 982 983 /* 984 * encode write list -- we already drove RDMA_WRITEs 985 */ 986 cl_write = crdp->cl_wlist; 987 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) { 988 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist); 989 rdma_buf_free(conn, &rbuf_rpc_resp); 990 rdma_buf_free(conn, &rbuf_resp); 991 goto out; 992 } 993 994 /* 995 * XDR encode the RDMA_REPLY write chunk 996 */ 997 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply, 998 num_wreply_segments)) { 999 rdma_buf_free(conn, &rbuf_rpc_resp); 1000 rdma_buf_free(conn, &rbuf_resp); 1001 goto out; 1002 } 1003 1004 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle, 1005 rbuf_resp.addr, NULL, NULL); 1006 1007 if (rdma_response_op == RDMA_MSG) { 1008 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle, 1009 rbuf_rpc_resp.addr, NULL, NULL); 1010 } 1011 1012 status = RDMA_SEND(conn, cl_send, msg->rm_xid); 1013 1014 if (status == RDMA_SUCCESS) { 1015 retval = TRUE; 1016 } 1017 1018 out: 1019 /* 1020 * Free up sendlist chunks 1021 */ 1022 if (cl_send != NULL) 1023 clist_free(cl_send); 1024 1025 /* 1026 * Destroy private data for xdr rdma 1027 */ 1028 if (clone_xprt->xp_xdrout.x_ops != NULL) { 1029 XDR_DESTROY(&(clone_xprt->xp_xdrout)); 1030 } 1031 1032 if (crdp->cl_reply) { 1033 clist_free(crdp->cl_reply); 1034 crdp->cl_reply = NULL; 1035 } 1036 1037 /* 1038 * This is completely disgusting. If public is set it is 1039 * a pointer to a structure whose first field is the address 1040 * of the function to free that structure and any related 1041 * stuff. (see rrokfree in nfs_xdr.c). 1042 */ 1043 if (xdrs_rpc->x_public) { 1044 /* LINTED pointer alignment */ 1045 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public); 1046 } 1047 1048 if (xdrs_rhdr.x_ops != NULL) { 1049 XDR_DESTROY(&xdrs_rhdr); 1050 } 1051 1052 return (retval); 1053 } 1054 1055 /* 1056 * Deserialize arguments. 1057 */ 1058 static bool_t 1059 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr) 1060 { 1061 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin, 1062 xdr_args, args_ptr)) != TRUE) 1063 return (FALSE); 1064 return (TRUE); 1065 } 1066 1067 static bool_t 1068 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, 1069 caddr_t args_ptr) 1070 { 1071 struct clone_rdma_data *crdp; 1072 bool_t retval; 1073 1074 /* 1075 * If the cloned bit is true, then this transport specific 1076 * rmda data has been duplicated into another cloned xprt. Do 1077 * not free, or release the connection, it is still in use. The 1078 * buffers will be freed and the connection released later by 1079 * SVC_CLONE_DESTROY(). 1080 */ 1081 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 1082 if (crdp->cloned == TRUE) { 1083 crdp->cloned = 0; 1084 return (TRUE); 1085 } 1086 1087 /* 1088 * Free the args if needed then XDR_DESTROY 1089 */ 1090 if (args_ptr) { 1091 XDR *xdrs = &clone_xprt->xp_xdrin; 1092 1093 xdrs->x_op = XDR_FREE; 1094 retval = (*xdr_args)(xdrs, args_ptr); 1095 } 1096 1097 XDR_DESTROY(&(clone_xprt->xp_xdrin)); 1098 rdma_buf_free(crdp->conn, &crdp->rpcbuf); 1099 if (crdp->cl_reply) { 1100 clist_free(crdp->cl_reply); 1101 crdp->cl_reply = NULL; 1102 } 1103 RDMA_REL_CONN(crdp->conn); 1104 1105 return (retval); 1106 } 1107 1108 /* ARGSUSED */ 1109 static int32_t * 1110 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size) 1111 { 1112 return (NULL); 1113 } 1114 1115 /* ARGSUSED */ 1116 static void 1117 svc_rdma_kfreeres(SVCXPRT *clone_xprt) 1118 { 1119 } 1120 1121 /* 1122 * the dup cacheing routines below provide a cache of non-failure 1123 * transaction id's. rpc service routines can use this to detect 1124 * retransmissions and re-send a non-failure response. 1125 */ 1126 1127 /* 1128 * MAXDUPREQS is the number of cached items. It should be adjusted 1129 * to the service load so that there is likely to be a response entry 1130 * when the first retransmission comes in. 1131 */ 1132 #define MAXDUPREQS 1024 1133 1134 /* 1135 * This should be appropriately scaled to MAXDUPREQS. 1136 */ 1137 #define DRHASHSZ 257 1138 1139 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0) 1140 #define XIDHASH(xid) ((xid) & (DRHASHSZ - 1)) 1141 #else 1142 #define XIDHASH(xid) ((xid) % DRHASHSZ) 1143 #endif 1144 #define DRHASH(dr) XIDHASH((dr)->dr_xid) 1145 #define REQTOXID(req) ((req)->rq_xprt->xp_xid) 1146 1147 static int rdmandupreqs = 0; 1148 int rdmamaxdupreqs = MAXDUPREQS; 1149 static kmutex_t rdmadupreq_lock; 1150 static struct dupreq *rdmadrhashtbl[DRHASHSZ]; 1151 static int rdmadrhashstat[DRHASHSZ]; 1152 1153 static void unhash(struct dupreq *); 1154 1155 /* 1156 * rdmadrmru points to the head of a circular linked list in lru order. 1157 * rdmadrmru->dr_next == drlru 1158 */ 1159 struct dupreq *rdmadrmru; 1160 1161 /* 1162 * svc_rdma_kdup searches the request cache and returns 0 if the 1163 * request is not found in the cache. If it is found, then it 1164 * returns the state of the request (in progress or done) and 1165 * the status or attributes that were part of the original reply. 1166 */ 1167 static int 1168 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp, 1169 bool_t *dupcachedp) 1170 { 1171 struct dupreq *dr; 1172 uint32_t xid; 1173 uint32_t drhash; 1174 int status; 1175 1176 xid = REQTOXID(req); 1177 mutex_enter(&rdmadupreq_lock); 1178 RSSTAT_INCR(rsdupchecks); 1179 /* 1180 * Check to see whether an entry already exists in the cache. 1181 */ 1182 dr = rdmadrhashtbl[XIDHASH(xid)]; 1183 while (dr != NULL) { 1184 if (dr->dr_xid == xid && 1185 dr->dr_proc == req->rq_proc && 1186 dr->dr_prog == req->rq_prog && 1187 dr->dr_vers == req->rq_vers && 1188 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len && 1189 bcmp((caddr_t)dr->dr_addr.buf, 1190 (caddr_t)req->rq_xprt->xp_rtaddr.buf, 1191 dr->dr_addr.len) == 0) { 1192 status = dr->dr_status; 1193 if (status == DUP_DONE) { 1194 bcopy(dr->dr_resp.buf, res, size); 1195 if (dupcachedp != NULL) 1196 *dupcachedp = (dr->dr_resfree != NULL); 1197 } else { 1198 dr->dr_status = DUP_INPROGRESS; 1199 *drpp = dr; 1200 } 1201 RSSTAT_INCR(rsdupreqs); 1202 mutex_exit(&rdmadupreq_lock); 1203 return (status); 1204 } 1205 dr = dr->dr_chain; 1206 } 1207 1208 /* 1209 * There wasn't an entry, either allocate a new one or recycle 1210 * an old one. 1211 */ 1212 if (rdmandupreqs < rdmamaxdupreqs) { 1213 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP); 1214 if (dr == NULL) { 1215 mutex_exit(&rdmadupreq_lock); 1216 return (DUP_ERROR); 1217 } 1218 dr->dr_resp.buf = NULL; 1219 dr->dr_resp.maxlen = 0; 1220 dr->dr_addr.buf = NULL; 1221 dr->dr_addr.maxlen = 0; 1222 if (rdmadrmru) { 1223 dr->dr_next = rdmadrmru->dr_next; 1224 rdmadrmru->dr_next = dr; 1225 } else { 1226 dr->dr_next = dr; 1227 } 1228 rdmandupreqs++; 1229 } else { 1230 dr = rdmadrmru->dr_next; 1231 while (dr->dr_status == DUP_INPROGRESS) { 1232 dr = dr->dr_next; 1233 if (dr == rdmadrmru->dr_next) { 1234 mutex_exit(&rdmadupreq_lock); 1235 return (DUP_ERROR); 1236 } 1237 } 1238 unhash(dr); 1239 if (dr->dr_resfree) { 1240 (*dr->dr_resfree)(dr->dr_resp.buf); 1241 } 1242 } 1243 dr->dr_resfree = NULL; 1244 rdmadrmru = dr; 1245 1246 dr->dr_xid = REQTOXID(req); 1247 dr->dr_prog = req->rq_prog; 1248 dr->dr_vers = req->rq_vers; 1249 dr->dr_proc = req->rq_proc; 1250 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { 1251 if (dr->dr_addr.buf != NULL) 1252 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); 1253 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; 1254 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); 1255 if (dr->dr_addr.buf == NULL) { 1256 dr->dr_addr.maxlen = 0; 1257 dr->dr_status = DUP_DROP; 1258 mutex_exit(&rdmadupreq_lock); 1259 return (DUP_ERROR); 1260 } 1261 } 1262 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; 1263 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); 1264 if (dr->dr_resp.maxlen < size) { 1265 if (dr->dr_resp.buf != NULL) 1266 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); 1267 dr->dr_resp.maxlen = (unsigned int)size; 1268 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP); 1269 if (dr->dr_resp.buf == NULL) { 1270 dr->dr_resp.maxlen = 0; 1271 dr->dr_status = DUP_DROP; 1272 mutex_exit(&rdmadupreq_lock); 1273 return (DUP_ERROR); 1274 } 1275 } 1276 dr->dr_status = DUP_INPROGRESS; 1277 1278 drhash = (uint32_t)DRHASH(dr); 1279 dr->dr_chain = rdmadrhashtbl[drhash]; 1280 rdmadrhashtbl[drhash] = dr; 1281 rdmadrhashstat[drhash]++; 1282 mutex_exit(&rdmadupreq_lock); 1283 *drpp = dr; 1284 return (DUP_NEW); 1285 } 1286 1287 /* 1288 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP) 1289 * and stores the response. 1290 */ 1291 static void 1292 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), 1293 int size, int status) 1294 { 1295 ASSERT(dr->dr_resfree == NULL); 1296 if (status == DUP_DONE) { 1297 bcopy(res, dr->dr_resp.buf, size); 1298 dr->dr_resfree = dis_resfree; 1299 } 1300 dr->dr_status = status; 1301 } 1302 1303 /* 1304 * This routine expects that the mutex, rdmadupreq_lock, is already held. 1305 */ 1306 static void 1307 unhash(struct dupreq *dr) 1308 { 1309 struct dupreq *drt; 1310 struct dupreq *drtprev = NULL; 1311 uint32_t drhash; 1312 1313 ASSERT(MUTEX_HELD(&rdmadupreq_lock)); 1314 1315 drhash = (uint32_t)DRHASH(dr); 1316 drt = rdmadrhashtbl[drhash]; 1317 while (drt != NULL) { 1318 if (drt == dr) { 1319 rdmadrhashstat[drhash]--; 1320 if (drtprev == NULL) { 1321 rdmadrhashtbl[drhash] = drt->dr_chain; 1322 } else { 1323 drtprev->dr_chain = drt->dr_chain; 1324 } 1325 return; 1326 } 1327 drtprev = drt; 1328 drt = drt->dr_chain; 1329 } 1330 } 1331 1332 bool_t 1333 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist) 1334 { 1335 struct clist *clist; 1336 uint32_t tlen; 1337 1338 if (req->rq_xprt->xp_type != T_RDMA) { 1339 return (FALSE); 1340 } 1341 1342 tlen = 0; 1343 clist = wlist; 1344 while (clist) { 1345 tlen += clist->c_len; 1346 clist = clist->c_next; 1347 } 1348 1349 /* 1350 * set iov to addr+len of first segment of first wchunk of 1351 * wlist sent by client. krecv() already malloc'd a buffer 1352 * large enough, but registration is deferred until we write 1353 * the buffer back to (NFS) client using RDMA_WRITE. 1354 */ 1355 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr; 1356 iov->iov_len = tlen; 1357 1358 return (TRUE); 1359 } 1360 1361 /* 1362 * routine to setup the read chunk lists 1363 */ 1364 1365 int 1366 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len) 1367 { 1368 int data_len, avail_len; 1369 uint_t round_len; 1370 1371 data_len = avail_len = 0; 1372 1373 while (wcl != NULL && count > 0) { 1374 if (wcl->c_dmemhandle.mrc_rmr == 0) 1375 break; 1376 1377 if (wcl->c_len < count) { 1378 data_len += wcl->c_len; 1379 avail_len = 0; 1380 } else { 1381 data_len += count; 1382 avail_len = wcl->c_len - count; 1383 wcl->c_len = count; 1384 } 1385 count -= wcl->c_len; 1386 1387 if (count == 0) 1388 break; 1389 1390 wcl = wcl->c_next; 1391 } 1392 1393 /* 1394 * MUST fail if there are still more data 1395 */ 1396 if (count > 0) { 1397 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len, 1398 int, data_len, int, count); 1399 return (FALSE); 1400 } 1401 1402 /* 1403 * Round up the last chunk to 4-byte boundary 1404 */ 1405 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT); 1406 round_len = *wcl_len - data_len; 1407 1408 if (round_len) { 1409 1410 /* 1411 * If there is space in the current chunk, 1412 * add the roundup to the chunk. 1413 */ 1414 if (avail_len >= round_len) { 1415 wcl->c_len += round_len; 1416 } else { 1417 /* 1418 * try the next one. 1419 */ 1420 wcl = wcl->c_next; 1421 if ((wcl == NULL) || (wcl->c_len < round_len)) { 1422 DTRACE_PROBE1( 1423 krpc__e__rdma_setup_read_chunks_rndup, 1424 int, round_len); 1425 return (FALSE); 1426 } 1427 wcl->c_len = round_len; 1428 } 1429 } 1430 1431 wcl = wcl->c_next; 1432 1433 /* 1434 * Make rest of the chunks 0-len 1435 */ 1436 1437 clist_zero_len(wcl); 1438 1439 return (TRUE); 1440 } 1441