1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 25 /* All Rights Reserved */ 26 /* 27 * Portions of this source code were derived from Berkeley 28 * 4.3 BSD under license from the Regents of the University of 29 * California. 30 */ 31 32 /* 33 * Server side of RPC over RDMA in the kernel. 34 */ 35 36 #include <sys/param.h> 37 #include <sys/types.h> 38 #include <sys/user.h> 39 #include <sys/sysmacros.h> 40 #include <sys/proc.h> 41 #include <sys/file.h> 42 #include <sys/errno.h> 43 #include <sys/kmem.h> 44 #include <sys/debug.h> 45 #include <sys/systm.h> 46 #include <sys/cmn_err.h> 47 #include <sys/kstat.h> 48 #include <sys/vtrace.h> 49 #include <sys/debug.h> 50 51 #include <rpc/types.h> 52 #include <rpc/xdr.h> 53 #include <rpc/auth.h> 54 #include <rpc/clnt.h> 55 #include <rpc/rpc_msg.h> 56 #include <rpc/svc.h> 57 #include <rpc/rpc_rdma.h> 58 #include <sys/ddi.h> 59 #include <sys/sunddi.h> 60 61 #include <inet/common.h> 62 #include <inet/ip.h> 63 #include <inet/ip6.h> 64 65 #include <nfs/nfs.h> 66 #include <sys/sdt.h> 67 68 #define SVC_RDMA_SUCCESS 0 69 #define SVC_RDMA_FAIL -1 70 71 #define SVC_CREDIT_FACTOR (0.5) 72 73 #define MSG_IS_RPCSEC_GSS(msg) \ 74 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) 75 76 77 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT; 78 79 /* 80 * RDMA transport specific data associated with SVCMASTERXPRT 81 */ 82 struct rdma_data { 83 SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */ 84 struct rdma_svc_data rd_data; /* rdma data */ 85 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */ 86 }; 87 88 /* 89 * Plugin connection specific data stashed away in clone SVCXPRT 90 */ 91 struct clone_rdma_data { 92 bool_t cloned; /* xprt cloned for thread processing */ 93 CONN *conn; /* RDMA connection */ 94 rdma_buf_t rpcbuf; /* RPC req/resp buffer */ 95 struct clist *cl_reply; /* reply chunk buffer info */ 96 struct clist *cl_wlist; /* write list clist */ 97 }; 98 99 100 #define MAXADDRLEN 128 /* max length for address mask */ 101 102 /* 103 * Routines exported through ops vector. 104 */ 105 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *); 106 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *); 107 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t); 108 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t); 109 void svc_rdma_kdestroy(SVCMASTERXPRT *); 110 static int svc_rdma_kdup(struct svc_req *, caddr_t, int, 111 struct dupreq **, bool_t *); 112 static void svc_rdma_kdupdone(struct dupreq *, caddr_t, 113 void (*)(), int, int); 114 static int32_t *svc_rdma_kgetres(SVCXPRT *, int); 115 static void svc_rdma_kfreeres(SVCXPRT *); 116 static void svc_rdma_kclone_destroy(SVCXPRT *); 117 static void svc_rdma_kstart(SVCMASTERXPRT *); 118 void svc_rdma_kstop(SVCMASTERXPRT *); 119 static void svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *); 120 static void svc_rdma_ktattrs(SVCXPRT *, int, void **); 121 122 static int svc_process_long_reply(SVCXPRT *, xdrproc_t, 123 caddr_t, struct rpc_msg *, bool_t, int *, 124 int *, int *, unsigned int *); 125 126 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t, 127 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *, 128 bool_t, uint_t *); 129 static bool_t rpcmsg_length(xdrproc_t, 130 caddr_t, 131 struct rpc_msg *, bool_t, int); 132 133 /* 134 * Server transport operations vector. 135 */ 136 struct svc_ops rdma_svc_ops = { 137 svc_rdma_krecv, /* Get requests */ 138 svc_rdma_kgetargs, /* Deserialize arguments */ 139 svc_rdma_ksend, /* Send reply */ 140 svc_rdma_kfreeargs, /* Free argument data space */ 141 svc_rdma_kdestroy, /* Destroy transport handle */ 142 svc_rdma_kdup, /* Check entry in dup req cache */ 143 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */ 144 svc_rdma_kgetres, /* Get pointer to response buffer */ 145 svc_rdma_kfreeres, /* Destroy pre-serialized response header */ 146 svc_rdma_kclone_destroy, /* Destroy a clone xprt */ 147 svc_rdma_kstart, /* Tell `ready-to-receive' to rpcmod */ 148 svc_rdma_kclone_xprt, /* Transport specific clone xprt */ 149 svc_rdma_ktattrs /* Get Transport Attributes */ 150 }; 151 152 /* 153 * Server statistics 154 * NOTE: This structure type is duplicated in the NFS fast path. 155 */ 156 struct { 157 kstat_named_t rscalls; 158 kstat_named_t rsbadcalls; 159 kstat_named_t rsnullrecv; 160 kstat_named_t rsbadlen; 161 kstat_named_t rsxdrcall; 162 kstat_named_t rsdupchecks; 163 kstat_named_t rsdupreqs; 164 kstat_named_t rslongrpcs; 165 kstat_named_t rstotalreplies; 166 kstat_named_t rstotallongreplies; 167 kstat_named_t rstotalinlinereplies; 168 } rdmarsstat = { 169 { "calls", KSTAT_DATA_UINT64 }, 170 { "badcalls", KSTAT_DATA_UINT64 }, 171 { "nullrecv", KSTAT_DATA_UINT64 }, 172 { "badlen", KSTAT_DATA_UINT64 }, 173 { "xdrcall", KSTAT_DATA_UINT64 }, 174 { "dupchecks", KSTAT_DATA_UINT64 }, 175 { "dupreqs", KSTAT_DATA_UINT64 }, 176 { "longrpcs", KSTAT_DATA_UINT64 }, 177 { "totalreplies", KSTAT_DATA_UINT64 }, 178 { "totallongreplies", KSTAT_DATA_UINT64 }, 179 { "totalinlinereplies", KSTAT_DATA_UINT64 }, 180 }; 181 182 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat; 183 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t); 184 185 #define RSSTAT_INCR(x) atomic_add_64(&rdmarsstat.x.value.ui64, 1) 186 /* 187 * Create a transport record. 188 * The transport record, output buffer, and private data structure 189 * are allocated. The output buffer is serialized into using xdrmem. 190 * There is one transport record per user process which implements a 191 * set of services. 192 */ 193 /* ARGSUSED */ 194 int 195 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, 196 rdma_xprt_group_t *started_xprts) 197 { 198 int error; 199 SVCMASTERXPRT *xprt; 200 struct rdma_data *rd; 201 rdma_registry_t *rmod; 202 rdma_xprt_record_t *xprt_rec; 203 queue_t *q; 204 /* 205 * modload the RDMA plugins is not already done. 206 */ 207 if (!rdma_modloaded) { 208 /*CONSTANTCONDITION*/ 209 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN); 210 211 mutex_enter(&rdma_modload_lock); 212 if (!rdma_modloaded) { 213 error = rdma_modload(); 214 } 215 mutex_exit(&rdma_modload_lock); 216 217 if (error) 218 return (error); 219 } 220 221 /* 222 * master_xprt_count is the count of master transport handles 223 * that were successfully created and are ready to recieve for 224 * RDMA based access. 225 */ 226 error = 0; 227 xprt_rec = NULL; 228 rw_enter(&rdma_lock, RW_READER); 229 if (rdma_mod_head == NULL) { 230 started_xprts->rtg_count = 0; 231 rw_exit(&rdma_lock); 232 if (rdma_dev_available) 233 return (EPROTONOSUPPORT); 234 else 235 return (ENODEV); 236 } 237 238 /* 239 * If we have reached here, then atleast one RDMA plugin has loaded. 240 * Create a master_xprt, make it start listenining on the device, 241 * if an error is generated, record it, we might need to shut 242 * the master_xprt. 243 * SVC_START() calls svc_rdma_kstart which calls plugin binding 244 * routines. 245 */ 246 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) { 247 248 /* 249 * One SVCMASTERXPRT per RDMA plugin. 250 */ 251 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP); 252 xprt->xp_ops = &rdma_svc_ops; 253 xprt->xp_sct = sct; 254 xprt->xp_type = T_RDMA; 255 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); 256 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); 257 xprt->xp_req_head = (mblk_t *)0; 258 xprt->xp_req_tail = (mblk_t *)0; 259 xprt->xp_threads = 0; 260 xprt->xp_detached_threads = 0; 261 262 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP); 263 xprt->xp_p2 = (caddr_t)rd; 264 rd->rd_xprt = xprt; 265 rd->r_mod = rmod->r_mod; 266 267 q = &rd->rd_data.q; 268 xprt->xp_wq = q; 269 q->q_ptr = &rd->rd_xprt; 270 xprt->xp_netid = NULL; 271 272 /* 273 * Each of the plugins will have their own Service ID 274 * to listener specific mapping, like port number for VI 275 * and service name for IB. 276 */ 277 rd->rd_data.svcid = id; 278 error = svc_xprt_register(xprt, id); 279 if (error) { 280 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg); 281 goto cleanup; 282 } 283 284 SVC_START(xprt); 285 if (!rd->rd_data.active) { 286 svc_xprt_unregister(xprt); 287 error = rd->rd_data.err_code; 288 goto cleanup; 289 } 290 291 /* 292 * This is set only when there is atleast one or more 293 * transports successfully created. We insert the pointer 294 * to the created RDMA master xprt into a separately maintained 295 * list. This way we can easily reference it later to cleanup, 296 * when NFS kRPC service pool is going away/unregistered. 297 */ 298 started_xprts->rtg_count ++; 299 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP); 300 xprt_rec->rtr_xprt_ptr = xprt; 301 xprt_rec->rtr_next = started_xprts->rtg_listhead; 302 started_xprts->rtg_listhead = xprt_rec; 303 continue; 304 cleanup: 305 SVC_DESTROY(xprt); 306 if (error == RDMA_FAILED) 307 error = EPROTONOSUPPORT; 308 } 309 310 rw_exit(&rdma_lock); 311 312 /* 313 * Don't return any error even if a single plugin was started 314 * successfully. 315 */ 316 if (started_xprts->rtg_count == 0) 317 return (error); 318 return (0); 319 } 320 321 /* 322 * Cleanup routine for freeing up memory allocated by 323 * svc_rdma_kcreate() 324 */ 325 void 326 svc_rdma_kdestroy(SVCMASTERXPRT *xprt) 327 { 328 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2; 329 330 331 mutex_destroy(&xprt->xp_req_lock); 332 mutex_destroy(&xprt->xp_thread_lock); 333 kmem_free(rd, sizeof (*rd)); 334 kmem_free(xprt, sizeof (*xprt)); 335 } 336 337 338 static void 339 svc_rdma_kstart(SVCMASTERXPRT *xprt) 340 { 341 struct rdma_svc_data *svcdata; 342 rdma_mod_t *rmod; 343 344 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 345 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 346 347 /* 348 * Create a listener for module at this port 349 */ 350 351 if (rmod->rdma_count != 0) 352 (*rmod->rdma_ops->rdma_svc_listen)(svcdata); 353 else 354 svcdata->err_code = RDMA_FAILED; 355 } 356 357 void 358 svc_rdma_kstop(SVCMASTERXPRT *xprt) 359 { 360 struct rdma_svc_data *svcdata; 361 rdma_mod_t *rmod; 362 363 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 364 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 365 366 /* 367 * Call the stop listener routine for each plugin. If rdma_count is 368 * already zero set active to zero. 369 */ 370 if (rmod->rdma_count != 0) 371 (*rmod->rdma_ops->rdma_svc_stop)(svcdata); 372 else 373 svcdata->active = 0; 374 if (svcdata->active) 375 DTRACE_PROBE(krpc__e__svcrdma__kstop); 376 } 377 378 /* ARGSUSED */ 379 static void 380 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt) 381 { 382 383 struct clone_rdma_data *cdrp; 384 cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 385 386 /* 387 * Only free buffers and release connection when cloned is set. 388 */ 389 if (cdrp->cloned != TRUE) 390 return; 391 392 rdma_buf_free(cdrp->conn, &cdrp->rpcbuf); 393 if (cdrp->cl_reply) { 394 clist_free(cdrp->cl_reply); 395 cdrp->cl_reply = NULL; 396 } 397 RDMA_REL_CONN(cdrp->conn); 398 399 cdrp->cloned = 0; 400 } 401 402 /* 403 * Clone the xprt specific information. It will be freed by 404 * SVC_CLONE_DESTROY. 405 */ 406 static void 407 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt) 408 { 409 struct clone_rdma_data *srcp2; 410 struct clone_rdma_data *dstp2; 411 412 srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf; 413 dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf; 414 415 if (srcp2->conn != NULL) { 416 srcp2->cloned = TRUE; 417 *dstp2 = *srcp2; 418 } 419 } 420 421 static void 422 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr) 423 { 424 CONN *conn; 425 *tattr = NULL; 426 427 switch (attrflag) { 428 case SVC_TATTR_ADDRMASK: 429 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn; 430 ASSERT(conn != NULL); 431 if (conn) 432 *tattr = (void *)&conn->c_addrmask; 433 } 434 } 435 436 static bool_t 437 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg) 438 { 439 XDR *xdrs; 440 CONN *conn; 441 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; 442 struct clone_rdma_data *crdp; 443 struct clist *cl = NULL; 444 struct clist *wcl = NULL; 445 struct clist *cllong = NULL; 446 447 rdma_stat status; 448 uint32_t vers, op, pos, xid; 449 uint32_t rdma_credit; 450 uint32_t wcl_total_length = 0; 451 bool_t wwl = FALSE; 452 453 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 454 RSSTAT_INCR(rscalls); 455 conn = rdp->conn; 456 457 status = rdma_svc_postrecv(conn); 458 if (status != RDMA_SUCCESS) { 459 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv); 460 goto badrpc_call; 461 } 462 463 xdrs = &clone_xprt->xp_xdrin; 464 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE); 465 xid = *(uint32_t *)rdp->rpcmsg.addr; 466 XDR_SETPOS(xdrs, sizeof (uint32_t)); 467 468 if (! xdr_u_int(xdrs, &vers) || 469 ! xdr_u_int(xdrs, &rdma_credit) || 470 ! xdr_u_int(xdrs, &op)) { 471 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint); 472 goto xdr_err; 473 } 474 475 /* Checking if the status of the recv operation was normal */ 476 if (rdp->status != 0) { 477 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status, 478 int, rdp->status); 479 goto badrpc_call; 480 } 481 482 if (! xdr_do_clist(xdrs, &cl)) { 483 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist); 484 goto xdr_err; 485 } 486 487 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) { 488 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist); 489 if (cl) 490 clist_free(cl); 491 goto xdr_err; 492 } 493 crdp->cl_wlist = wcl; 494 495 crdp->cl_reply = NULL; 496 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply); 497 498 /* 499 * A chunk at 0 offset indicates that the RPC call message 500 * is in a chunk. Get the RPC call message chunk. 501 */ 502 if (cl != NULL && op == RDMA_NOMSG) { 503 504 /* Remove RPC call message chunk from chunklist */ 505 cllong = cl; 506 cl = cl->c_next; 507 cllong->c_next = NULL; 508 509 510 /* Allocate and register memory for the RPC call msg chunk */ 511 cllong->rb_longbuf.type = RDMA_LONG_BUFFER; 512 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ? 513 cllong->c_len : LONG_REPLY_LEN; 514 515 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) { 516 clist_free(cllong); 517 goto cll_malloc_err; 518 } 519 520 cllong->u.c_daddr3 = cllong->rb_longbuf.addr; 521 522 if (cllong->u.c_daddr == NULL) { 523 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem); 524 rdma_buf_free(conn, &cllong->rb_longbuf); 525 clist_free(cllong); 526 goto cll_malloc_err; 527 } 528 529 status = clist_register(conn, cllong, CLIST_REG_DST); 530 if (status) { 531 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg); 532 rdma_buf_free(conn, &cllong->rb_longbuf); 533 clist_free(cllong); 534 goto cll_malloc_err; 535 } 536 537 /* 538 * Now read the RPC call message in 539 */ 540 status = RDMA_READ(conn, cllong, WAIT); 541 if (status) { 542 DTRACE_PROBE(krpc__e__svcrdma__krecv__read); 543 (void) clist_deregister(conn, cllong); 544 rdma_buf_free(conn, &cllong->rb_longbuf); 545 clist_free(cllong); 546 goto cll_malloc_err; 547 } 548 549 status = clist_syncmem(conn, cllong, CLIST_REG_DST); 550 (void) clist_deregister(conn, cllong); 551 552 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3, 553 cllong->c_len, 0, cl, XDR_DECODE, conn); 554 555 crdp->rpcbuf = cllong->rb_longbuf; 556 crdp->rpcbuf.len = cllong->c_len; 557 clist_free(cllong); 558 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 559 } else { 560 pos = XDR_GETPOS(xdrs); 561 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos, 562 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn); 563 crdp->rpcbuf = rdp->rpcmsg; 564 565 /* Use xdrrdmablk_ops to indicate there is a read chunk list */ 566 if (cl != NULL) { 567 int32_t flg = XDR_RDMA_RLIST_REG; 568 569 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 570 xdrs->x_ops = &xdrrdmablk_ops; 571 } 572 } 573 574 if (crdp->cl_wlist) { 575 int32_t flg = XDR_RDMA_WLIST_REG; 576 577 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist); 578 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 579 } 580 581 if (! xdr_callmsg(xdrs, msg)) { 582 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg); 583 RSSTAT_INCR(rsxdrcall); 584 goto callmsg_err; 585 } 586 587 /* 588 * Point the remote transport address in the service_transport 589 * handle at the address in the request. 590 */ 591 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf; 592 clone_xprt->xp_rtaddr.len = conn->c_raddr.len; 593 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len; 594 595 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf; 596 clone_xprt->xp_lcladdr.len = conn->c_laddr.len; 597 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len; 598 599 /* 600 * In case of RDMA, connection management is 601 * entirely done in rpcib module and netid in the 602 * SVCMASTERXPRT is NULL. Initialize the clone netid 603 * from the connection. 604 */ 605 606 clone_xprt->xp_netid = conn->c_netid; 607 608 clone_xprt->xp_xid = xid; 609 crdp->conn = conn; 610 611 freeb(mp); 612 613 return (TRUE); 614 615 callmsg_err: 616 rdma_buf_free(conn, &crdp->rpcbuf); 617 618 cll_malloc_err: 619 if (cl) 620 clist_free(cl); 621 xdr_err: 622 XDR_DESTROY(xdrs); 623 624 badrpc_call: 625 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 626 RDMA_REL_CONN(conn); 627 freeb(mp); 628 RSSTAT_INCR(rsbadcalls); 629 return (FALSE); 630 } 631 632 static int 633 svc_process_long_reply(SVCXPRT * clone_xprt, 634 xdrproc_t xdr_results, caddr_t xdr_location, 635 struct rpc_msg *msg, bool_t has_args, int *msglen, 636 int *freelen, int *numchunks, unsigned int *final_len) 637 { 638 int status; 639 XDR xdrslong; 640 struct clist *wcl = NULL; 641 int count = 0; 642 int alloc_len; 643 char *memp; 644 rdma_buf_t long_rpc = {0}; 645 struct clone_rdma_data *crdp; 646 647 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 648 649 bzero(&xdrslong, sizeof (xdrslong)); 650 651 /* Choose a size for the long rpc response */ 652 if (MSG_IS_RPCSEC_GSS(msg)) { 653 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen); 654 } else { 655 alloc_len = RNDUP(*msglen); 656 } 657 658 if (alloc_len <= 64 * 1024) { 659 if (alloc_len > 32 * 1024) { 660 alloc_len = 64 * 1024; 661 } else { 662 if (alloc_len > 16 * 1024) { 663 alloc_len = 32 * 1024; 664 } else { 665 alloc_len = 16 * 1024; 666 } 667 } 668 } 669 670 long_rpc.type = RDMA_LONG_BUFFER; 671 long_rpc.len = alloc_len; 672 if (rdma_buf_alloc(crdp->conn, &long_rpc)) { 673 return (SVC_RDMA_FAIL); 674 } 675 676 memp = long_rpc.addr; 677 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE); 678 679 msg->rm_xid = clone_xprt->xp_xid; 680 681 if (!(xdr_replymsg(&xdrslong, msg) && 682 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong, 683 xdr_results, xdr_location)))) { 684 rdma_buf_free(crdp->conn, &long_rpc); 685 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap); 686 return (SVC_RDMA_FAIL); 687 } 688 689 *final_len = XDR_GETPOS(&xdrslong); 690 691 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len); 692 *numchunks = 0; 693 *freelen = 0; 694 695 wcl = crdp->cl_reply; 696 wcl->rb_longbuf = long_rpc; 697 698 count = *final_len; 699 while ((wcl != NULL) && (count > 0)) { 700 701 if (wcl->c_dmemhandle.mrc_rmr == 0) 702 break; 703 704 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count, 705 uint32_t, wcl->c_len); 706 707 if (wcl->c_len > count) { 708 wcl->c_len = count; 709 } 710 wcl->w.c_saddr3 = (caddr_t)memp; 711 712 count -= wcl->c_len; 713 *numchunks += 1; 714 memp += wcl->c_len; 715 wcl = wcl->c_next; 716 } 717 718 /* 719 * Make rest of the chunks 0-len 720 */ 721 while (wcl != NULL) { 722 if (wcl->c_dmemhandle.mrc_rmr == 0) 723 break; 724 wcl->c_len = 0; 725 wcl = wcl->c_next; 726 } 727 728 wcl = crdp->cl_reply; 729 730 /* 731 * MUST fail if there are still more data 732 */ 733 if (count > 0) { 734 rdma_buf_free(crdp->conn, &long_rpc); 735 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist); 736 return (SVC_RDMA_FAIL); 737 } 738 739 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) { 740 rdma_buf_free(crdp->conn, &long_rpc); 741 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg); 742 return (SVC_RDMA_FAIL); 743 } 744 745 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE); 746 747 if (status) { 748 (void) clist_deregister(crdp->conn, wcl); 749 rdma_buf_free(crdp->conn, &long_rpc); 750 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem); 751 return (SVC_RDMA_FAIL); 752 } 753 754 status = RDMA_WRITE(crdp->conn, wcl, WAIT); 755 756 (void) clist_deregister(crdp->conn, wcl); 757 rdma_buf_free(crdp->conn, &wcl->rb_longbuf); 758 759 if (status != RDMA_SUCCESS) { 760 DTRACE_PROBE(krpc__e__svcrdma__longrep__write); 761 return (SVC_RDMA_FAIL); 762 } 763 764 return (SVC_RDMA_SUCCESS); 765 } 766 767 768 static int 769 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results, 770 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs, 771 struct rpc_msg *msg, bool_t has_args, uint_t *len) 772 { 773 /* 774 * Get a pre-allocated buffer for rpc reply 775 */ 776 rpcreply->type = SEND_BUFFER; 777 if (rdma_buf_alloc(conn, rpcreply)) { 778 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs); 779 return (SVC_RDMA_FAIL); 780 } 781 782 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len, 783 0, NULL, XDR_ENCODE, conn); 784 785 msg->rm_xid = clone_xprt->xp_xid; 786 787 if (has_args) { 788 if (!(xdr_replymsg(*xdrs, msg) && 789 (!has_args || 790 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs, 791 xdr_results, xdr_location)))) { 792 rdma_buf_free(conn, rpcreply); 793 DTRACE_PROBE( 794 krpc__e__svcrdma__rpcmsg__reply__authwrap1); 795 return (SVC_RDMA_FAIL); 796 } 797 } else { 798 if (!xdr_replymsg(*xdrs, msg)) { 799 rdma_buf_free(conn, rpcreply); 800 DTRACE_PROBE( 801 krpc__e__svcrdma__rpcmsg__reply__authwrap2); 802 return (SVC_RDMA_FAIL); 803 } 804 } 805 806 *len = XDR_GETPOS(*xdrs); 807 808 return (SVC_RDMA_SUCCESS); 809 } 810 811 /* 812 * Send rpc reply. 813 */ 814 static bool_t 815 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg) 816 { 817 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout); 818 XDR xdrs_rhdr; 819 CONN *conn = NULL; 820 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0}; 821 822 struct clone_rdma_data *crdp; 823 struct clist *cl_read = NULL; 824 struct clist *cl_send = NULL; 825 struct clist *cl_write = NULL; 826 xdrproc_t xdr_results; /* results XDR encoding function */ 827 caddr_t xdr_location; /* response results pointer */ 828 829 int retval = FALSE; 830 int status, msglen, num_wreply_segments = 0; 831 uint32_t rdma_credit = 0; 832 int freelen = 0; 833 bool_t has_args; 834 uint_t final_resp_len, rdma_response_op, vers; 835 836 bzero(&xdrs_rhdr, sizeof (XDR)); 837 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 838 conn = crdp->conn; 839 840 /* 841 * If there is a result procedure specified in the reply message, 842 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP. 843 * We need to make sure it won't be processed twice, so we null 844 * it for xdr_replymsg here. 845 */ 846 has_args = FALSE; 847 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 848 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 849 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) { 850 has_args = TRUE; 851 xdr_location = msg->acpted_rply.ar_results.where; 852 msg->acpted_rply.ar_results.proc = xdr_void; 853 msg->acpted_rply.ar_results.where = NULL; 854 } 855 } 856 857 /* 858 * Given the limit on the inline response size (RPC_MSG_SZ), 859 * there is a need to make a guess as to the overall size of 860 * the response. If the resultant size is beyond the inline 861 * size, then the server needs to use the "reply chunk list" 862 * provided by the client (if the client provided one). An 863 * example of this type of response would be a READDIR 864 * response (e.g. a small directory read would fit in RPC_MSG_SZ 865 * and that is the preference but it may not fit) 866 * 867 * Combine the encoded size and the size of the true results 868 * and then make the decision about where to encode and send results. 869 * 870 * One important note, this calculation is ignoring the size 871 * of the encoding of the authentication overhead. The reason 872 * for this is rooted in the complexities of access to the 873 * encoded size of RPCSEC_GSS related authentiation, 874 * integrity, and privacy. 875 * 876 * If it turns out that the encoded authentication bumps the 877 * response over the RPC_MSG_SZ limit, then it may need to 878 * attempt to encode for the reply chunk list. 879 */ 880 881 /* 882 * Calculating the "sizeof" the RPC response header and the 883 * encoded results. 884 */ 885 msglen = xdr_sizeof(xdr_replymsg, msg); 886 887 if (msglen > 0) { 888 RSSTAT_INCR(rstotalreplies); 889 } 890 if (has_args) 891 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 892 rdma_minchunk, NULL, NULL); 893 894 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen); 895 896 status = SVC_RDMA_SUCCESS; 897 898 if (msglen < RPC_MSG_SZ) { 899 /* 900 * Looks like the response will fit in the inline 901 * response; let's try 902 */ 903 RSSTAT_INCR(rstotalinlinereplies); 904 905 rdma_response_op = RDMA_MSG; 906 907 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results, 908 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg, 909 has_args, &final_resp_len); 910 911 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status, 912 int, status); 913 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len, 914 int, final_resp_len); 915 916 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) { 917 clist_free(crdp->cl_reply); 918 crdp->cl_reply = NULL; 919 } 920 } 921 922 /* 923 * If the encode failed (size?) or the message really is 924 * larger than what is allowed, try the response chunk list. 925 */ 926 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) { 927 /* 928 * attempting to use a reply chunk list when there 929 * isn't one won't get very far... 930 */ 931 if (crdp->cl_reply == NULL) { 932 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl); 933 goto out; 934 } 935 936 RSSTAT_INCR(rstotallongreplies); 937 938 msglen = xdr_sizeof(xdr_replymsg, msg); 939 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0, 940 NULL, NULL); 941 942 status = svc_process_long_reply(clone_xprt, xdr_results, 943 xdr_location, msg, has_args, &msglen, &freelen, 944 &num_wreply_segments, &final_resp_len); 945 946 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen, 947 int, final_resp_len); 948 949 if (status != SVC_RDMA_SUCCESS) { 950 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed); 951 goto out; 952 } 953 954 rdma_response_op = RDMA_NOMSG; 955 } 956 957 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len, 958 int, final_resp_len); 959 960 rbuf_resp.type = SEND_BUFFER; 961 if (rdma_buf_alloc(conn, &rbuf_resp)) { 962 rdma_buf_free(conn, &rbuf_rpc_resp); 963 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs); 964 goto out; 965 } 966 967 rdma_credit = rdma_bufs_granted; 968 969 vers = RPCRDMA_VERS; 970 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE); 971 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid; 972 /* Skip xid and set the xdr position accordingly. */ 973 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t)); 974 if (!xdr_u_int(&xdrs_rhdr, &vers) || 975 !xdr_u_int(&xdrs_rhdr, &rdma_credit) || 976 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) { 977 rdma_buf_free(conn, &rbuf_rpc_resp); 978 rdma_buf_free(conn, &rbuf_resp); 979 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint); 980 goto out; 981 } 982 983 /* 984 * Now XDR the read chunk list, actually always NULL 985 */ 986 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read); 987 988 /* 989 * encode write list -- we already drove RDMA_WRITEs 990 */ 991 cl_write = crdp->cl_wlist; 992 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) { 993 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist); 994 rdma_buf_free(conn, &rbuf_rpc_resp); 995 rdma_buf_free(conn, &rbuf_resp); 996 goto out; 997 } 998 999 /* 1000 * XDR encode the RDMA_REPLY write chunk 1001 */ 1002 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply, 1003 num_wreply_segments)) { 1004 rdma_buf_free(conn, &rbuf_rpc_resp); 1005 rdma_buf_free(conn, &rbuf_resp); 1006 goto out; 1007 } 1008 1009 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle, 1010 rbuf_resp.addr, NULL, NULL); 1011 1012 if (rdma_response_op == RDMA_MSG) { 1013 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle, 1014 rbuf_rpc_resp.addr, NULL, NULL); 1015 } 1016 1017 status = RDMA_SEND(conn, cl_send, msg->rm_xid); 1018 1019 if (status == RDMA_SUCCESS) { 1020 retval = TRUE; 1021 } 1022 1023 out: 1024 /* 1025 * Free up sendlist chunks 1026 */ 1027 if (cl_send != NULL) 1028 clist_free(cl_send); 1029 1030 /* 1031 * Destroy private data for xdr rdma 1032 */ 1033 if (clone_xprt->xp_xdrout.x_ops != NULL) { 1034 XDR_DESTROY(&(clone_xprt->xp_xdrout)); 1035 } 1036 1037 if (crdp->cl_reply) { 1038 clist_free(crdp->cl_reply); 1039 crdp->cl_reply = NULL; 1040 } 1041 1042 /* 1043 * This is completely disgusting. If public is set it is 1044 * a pointer to a structure whose first field is the address 1045 * of the function to free that structure and any related 1046 * stuff. (see rrokfree in nfs_xdr.c). 1047 */ 1048 if (xdrs_rpc->x_public) { 1049 /* LINTED pointer alignment */ 1050 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public); 1051 } 1052 1053 if (xdrs_rhdr.x_ops != NULL) { 1054 XDR_DESTROY(&xdrs_rhdr); 1055 } 1056 1057 return (retval); 1058 } 1059 1060 /* 1061 * Deserialize arguments. 1062 */ 1063 static bool_t 1064 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr) 1065 { 1066 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin, 1067 xdr_args, args_ptr)) != TRUE) 1068 return (FALSE); 1069 return (TRUE); 1070 } 1071 1072 static bool_t 1073 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, 1074 caddr_t args_ptr) 1075 { 1076 struct clone_rdma_data *crdp; 1077 bool_t retval; 1078 1079 /* 1080 * If the cloned bit is true, then this transport specific 1081 * rmda data has been duplicated into another cloned xprt. Do 1082 * not free, or release the connection, it is still in use. The 1083 * buffers will be freed and the connection released later by 1084 * SVC_CLONE_DESTROY(). 1085 */ 1086 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 1087 if (crdp->cloned == TRUE) { 1088 crdp->cloned = 0; 1089 return (TRUE); 1090 } 1091 1092 /* 1093 * Free the args if needed then XDR_DESTROY 1094 */ 1095 if (args_ptr) { 1096 XDR *xdrs = &clone_xprt->xp_xdrin; 1097 1098 xdrs->x_op = XDR_FREE; 1099 retval = (*xdr_args)(xdrs, args_ptr); 1100 } 1101 1102 XDR_DESTROY(&(clone_xprt->xp_xdrin)); 1103 rdma_buf_free(crdp->conn, &crdp->rpcbuf); 1104 if (crdp->cl_reply) { 1105 clist_free(crdp->cl_reply); 1106 crdp->cl_reply = NULL; 1107 } 1108 RDMA_REL_CONN(crdp->conn); 1109 1110 return (retval); 1111 } 1112 1113 /* ARGSUSED */ 1114 static int32_t * 1115 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size) 1116 { 1117 return (NULL); 1118 } 1119 1120 /* ARGSUSED */ 1121 static void 1122 svc_rdma_kfreeres(SVCXPRT *clone_xprt) 1123 { 1124 } 1125 1126 /* 1127 * the dup cacheing routines below provide a cache of non-failure 1128 * transaction id's. rpc service routines can use this to detect 1129 * retransmissions and re-send a non-failure response. 1130 */ 1131 1132 /* 1133 * MAXDUPREQS is the number of cached items. It should be adjusted 1134 * to the service load so that there is likely to be a response entry 1135 * when the first retransmission comes in. 1136 */ 1137 #define MAXDUPREQS 1024 1138 1139 /* 1140 * This should be appropriately scaled to MAXDUPREQS. 1141 */ 1142 #define DRHASHSZ 257 1143 1144 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0) 1145 #define XIDHASH(xid) ((xid) & (DRHASHSZ - 1)) 1146 #else 1147 #define XIDHASH(xid) ((xid) % DRHASHSZ) 1148 #endif 1149 #define DRHASH(dr) XIDHASH((dr)->dr_xid) 1150 #define REQTOXID(req) ((req)->rq_xprt->xp_xid) 1151 1152 static int rdmandupreqs = 0; 1153 int rdmamaxdupreqs = MAXDUPREQS; 1154 static kmutex_t rdmadupreq_lock; 1155 static struct dupreq *rdmadrhashtbl[DRHASHSZ]; 1156 static int rdmadrhashstat[DRHASHSZ]; 1157 1158 static void unhash(struct dupreq *); 1159 1160 /* 1161 * rdmadrmru points to the head of a circular linked list in lru order. 1162 * rdmadrmru->dr_next == drlru 1163 */ 1164 struct dupreq *rdmadrmru; 1165 1166 /* 1167 * svc_rdma_kdup searches the request cache and returns 0 if the 1168 * request is not found in the cache. If it is found, then it 1169 * returns the state of the request (in progress or done) and 1170 * the status or attributes that were part of the original reply. 1171 */ 1172 static int 1173 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp, 1174 bool_t *dupcachedp) 1175 { 1176 struct dupreq *dr; 1177 uint32_t xid; 1178 uint32_t drhash; 1179 int status; 1180 1181 xid = REQTOXID(req); 1182 mutex_enter(&rdmadupreq_lock); 1183 RSSTAT_INCR(rsdupchecks); 1184 /* 1185 * Check to see whether an entry already exists in the cache. 1186 */ 1187 dr = rdmadrhashtbl[XIDHASH(xid)]; 1188 while (dr != NULL) { 1189 if (dr->dr_xid == xid && 1190 dr->dr_proc == req->rq_proc && 1191 dr->dr_prog == req->rq_prog && 1192 dr->dr_vers == req->rq_vers && 1193 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len && 1194 bcmp((caddr_t)dr->dr_addr.buf, 1195 (caddr_t)req->rq_xprt->xp_rtaddr.buf, 1196 dr->dr_addr.len) == 0) { 1197 status = dr->dr_status; 1198 if (status == DUP_DONE) { 1199 bcopy(dr->dr_resp.buf, res, size); 1200 if (dupcachedp != NULL) 1201 *dupcachedp = (dr->dr_resfree != NULL); 1202 } else { 1203 dr->dr_status = DUP_INPROGRESS; 1204 *drpp = dr; 1205 } 1206 RSSTAT_INCR(rsdupreqs); 1207 mutex_exit(&rdmadupreq_lock); 1208 return (status); 1209 } 1210 dr = dr->dr_chain; 1211 } 1212 1213 /* 1214 * There wasn't an entry, either allocate a new one or recycle 1215 * an old one. 1216 */ 1217 if (rdmandupreqs < rdmamaxdupreqs) { 1218 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP); 1219 if (dr == NULL) { 1220 mutex_exit(&rdmadupreq_lock); 1221 return (DUP_ERROR); 1222 } 1223 dr->dr_resp.buf = NULL; 1224 dr->dr_resp.maxlen = 0; 1225 dr->dr_addr.buf = NULL; 1226 dr->dr_addr.maxlen = 0; 1227 if (rdmadrmru) { 1228 dr->dr_next = rdmadrmru->dr_next; 1229 rdmadrmru->dr_next = dr; 1230 } else { 1231 dr->dr_next = dr; 1232 } 1233 rdmandupreqs++; 1234 } else { 1235 dr = rdmadrmru->dr_next; 1236 while (dr->dr_status == DUP_INPROGRESS) { 1237 dr = dr->dr_next; 1238 if (dr == rdmadrmru->dr_next) { 1239 mutex_exit(&rdmadupreq_lock); 1240 return (DUP_ERROR); 1241 } 1242 } 1243 unhash(dr); 1244 if (dr->dr_resfree) { 1245 (*dr->dr_resfree)(dr->dr_resp.buf); 1246 } 1247 } 1248 dr->dr_resfree = NULL; 1249 rdmadrmru = dr; 1250 1251 dr->dr_xid = REQTOXID(req); 1252 dr->dr_prog = req->rq_prog; 1253 dr->dr_vers = req->rq_vers; 1254 dr->dr_proc = req->rq_proc; 1255 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { 1256 if (dr->dr_addr.buf != NULL) 1257 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); 1258 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; 1259 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); 1260 if (dr->dr_addr.buf == NULL) { 1261 dr->dr_addr.maxlen = 0; 1262 dr->dr_status = DUP_DROP; 1263 mutex_exit(&rdmadupreq_lock); 1264 return (DUP_ERROR); 1265 } 1266 } 1267 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; 1268 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); 1269 if (dr->dr_resp.maxlen < size) { 1270 if (dr->dr_resp.buf != NULL) 1271 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); 1272 dr->dr_resp.maxlen = (unsigned int)size; 1273 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP); 1274 if (dr->dr_resp.buf == NULL) { 1275 dr->dr_resp.maxlen = 0; 1276 dr->dr_status = DUP_DROP; 1277 mutex_exit(&rdmadupreq_lock); 1278 return (DUP_ERROR); 1279 } 1280 } 1281 dr->dr_status = DUP_INPROGRESS; 1282 1283 drhash = (uint32_t)DRHASH(dr); 1284 dr->dr_chain = rdmadrhashtbl[drhash]; 1285 rdmadrhashtbl[drhash] = dr; 1286 rdmadrhashstat[drhash]++; 1287 mutex_exit(&rdmadupreq_lock); 1288 *drpp = dr; 1289 return (DUP_NEW); 1290 } 1291 1292 /* 1293 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP) 1294 * and stores the response. 1295 */ 1296 static void 1297 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), 1298 int size, int status) 1299 { 1300 ASSERT(dr->dr_resfree == NULL); 1301 if (status == DUP_DONE) { 1302 bcopy(res, dr->dr_resp.buf, size); 1303 dr->dr_resfree = dis_resfree; 1304 } 1305 dr->dr_status = status; 1306 } 1307 1308 /* 1309 * This routine expects that the mutex, rdmadupreq_lock, is already held. 1310 */ 1311 static void 1312 unhash(struct dupreq *dr) 1313 { 1314 struct dupreq *drt; 1315 struct dupreq *drtprev = NULL; 1316 uint32_t drhash; 1317 1318 ASSERT(MUTEX_HELD(&rdmadupreq_lock)); 1319 1320 drhash = (uint32_t)DRHASH(dr); 1321 drt = rdmadrhashtbl[drhash]; 1322 while (drt != NULL) { 1323 if (drt == dr) { 1324 rdmadrhashstat[drhash]--; 1325 if (drtprev == NULL) { 1326 rdmadrhashtbl[drhash] = drt->dr_chain; 1327 } else { 1328 drtprev->dr_chain = drt->dr_chain; 1329 } 1330 return; 1331 } 1332 drtprev = drt; 1333 drt = drt->dr_chain; 1334 } 1335 } 1336 1337 bool_t 1338 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist) 1339 { 1340 struct clist *clist; 1341 uint32_t tlen; 1342 1343 if (req->rq_xprt->xp_type != T_RDMA) { 1344 return (FALSE); 1345 } 1346 1347 tlen = 0; 1348 clist = wlist; 1349 while (clist) { 1350 tlen += clist->c_len; 1351 clist = clist->c_next; 1352 } 1353 1354 /* 1355 * set iov to addr+len of first segment of first wchunk of 1356 * wlist sent by client. krecv() already malloc'd a buffer 1357 * large enough, but registration is deferred until we write 1358 * the buffer back to (NFS) client using RDMA_WRITE. 1359 */ 1360 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr; 1361 iov->iov_len = tlen; 1362 1363 return (TRUE); 1364 } 1365 1366 /* 1367 * routine to setup the read chunk lists 1368 */ 1369 1370 int 1371 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len) 1372 { 1373 int data_len, avail_len; 1374 uint_t round_len; 1375 1376 data_len = avail_len = 0; 1377 1378 while (wcl != NULL && count > 0) { 1379 if (wcl->c_dmemhandle.mrc_rmr == 0) 1380 break; 1381 1382 if (wcl->c_len < count) { 1383 data_len += wcl->c_len; 1384 avail_len = 0; 1385 } else { 1386 data_len += count; 1387 avail_len = wcl->c_len - count; 1388 wcl->c_len = count; 1389 } 1390 count -= wcl->c_len; 1391 1392 if (count == 0) 1393 break; 1394 1395 wcl = wcl->c_next; 1396 } 1397 1398 /* 1399 * MUST fail if there are still more data 1400 */ 1401 if (count > 0) { 1402 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len, 1403 int, data_len, int, count); 1404 return (FALSE); 1405 } 1406 1407 /* 1408 * Round up the last chunk to 4-byte boundary 1409 */ 1410 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT); 1411 round_len = *wcl_len - data_len; 1412 1413 if (round_len) { 1414 1415 /* 1416 * If there is space in the current chunk, 1417 * add the roundup to the chunk. 1418 */ 1419 if (avail_len >= round_len) { 1420 wcl->c_len += round_len; 1421 } else { 1422 /* 1423 * try the next one. 1424 */ 1425 wcl = wcl->c_next; 1426 if ((wcl == NULL) || (wcl->c_len < round_len)) { 1427 DTRACE_PROBE1( 1428 krpc__e__rdma_setup_read_chunks_rndup, 1429 int, round_len); 1430 return (FALSE); 1431 } 1432 wcl->c_len = round_len; 1433 } 1434 } 1435 1436 wcl = wcl->c_next; 1437 1438 /* 1439 * Make rest of the chunks 0-len 1440 */ 1441 1442 clist_zero_len(wcl); 1443 1444 return (TRUE); 1445 } 1446