1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright (c) 2012 by Delphix. All rights reserved. 24 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 25 * Copyright 2012 Marcel Telka <marcel@telka.sk> 26 * Copyright 2018 OmniOS Community Edition (OmniOSce) Association. 27 */ 28 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */ 29 /* All Rights Reserved */ 30 /* 31 * Portions of this source code were derived from Berkeley 32 * 4.3 BSD under license from the Regents of the University of 33 * California. 34 */ 35 36 /* 37 * Server side of RPC over RDMA in the kernel. 38 */ 39 40 #include <sys/param.h> 41 #include <sys/types.h> 42 #include <sys/user.h> 43 #include <sys/sysmacros.h> 44 #include <sys/proc.h> 45 #include <sys/file.h> 46 #include <sys/errno.h> 47 #include <sys/kmem.h> 48 #include <sys/debug.h> 49 #include <sys/systm.h> 50 #include <sys/cmn_err.h> 51 #include <sys/kstat.h> 52 #include <sys/vtrace.h> 53 #include <sys/debug.h> 54 55 #include <rpc/types.h> 56 #include <rpc/xdr.h> 57 #include <rpc/auth.h> 58 #include <rpc/clnt.h> 59 #include <rpc/rpc_msg.h> 60 #include <rpc/svc.h> 61 #include <rpc/rpc_rdma.h> 62 #include <sys/ddi.h> 63 #include <sys/sunddi.h> 64 65 #include <inet/common.h> 66 #include <inet/ip.h> 67 #include <inet/ip6.h> 68 69 #include <nfs/nfs.h> 70 #include <sys/sdt.h> 71 72 #define SVC_RDMA_SUCCESS 0 73 #define SVC_RDMA_FAIL -1 74 75 #define SVC_CREDIT_FACTOR (0.5) 76 77 #define MSG_IS_RPCSEC_GSS(msg) \ 78 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) 79 80 81 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT; 82 83 /* 84 * RDMA transport specific data associated with SVCMASTERXPRT 85 */ 86 struct rdma_data { 87 SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */ 88 struct rdma_svc_data rd_data; /* rdma data */ 89 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */ 90 }; 91 92 /* 93 * Plugin connection specific data stashed away in clone SVCXPRT 94 */ 95 struct clone_rdma_data { 96 bool_t cloned; /* xprt cloned for thread processing */ 97 CONN *conn; /* RDMA connection */ 98 rdma_buf_t rpcbuf; /* RPC req/resp buffer */ 99 struct clist *cl_reply; /* reply chunk buffer info */ 100 struct clist *cl_wlist; /* write list clist */ 101 }; 102 103 104 #define MAXADDRLEN 128 /* max length for address mask */ 105 106 /* 107 * Routines exported through ops vector. 108 */ 109 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *); 110 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *); 111 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t); 112 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t); 113 void svc_rdma_kdestroy(SVCMASTERXPRT *); 114 static int svc_rdma_kdup(struct svc_req *, caddr_t, int, 115 struct dupreq **, bool_t *); 116 static void svc_rdma_kdupdone(struct dupreq *, caddr_t, 117 void (*)(), int, int); 118 static int32_t *svc_rdma_kgetres(SVCXPRT *, int); 119 static void svc_rdma_kfreeres(SVCXPRT *); 120 static void svc_rdma_kclone_destroy(SVCXPRT *); 121 static void svc_rdma_kstart(SVCMASTERXPRT *); 122 void svc_rdma_kstop(SVCMASTERXPRT *); 123 static void svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *); 124 static void svc_rdma_ktattrs(SVCXPRT *, int, void **); 125 126 static int svc_process_long_reply(SVCXPRT *, xdrproc_t, 127 caddr_t, struct rpc_msg *, bool_t, int *, 128 int *, int *, unsigned int *); 129 130 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t, 131 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *, 132 bool_t, uint_t *); 133 static bool_t rpcmsg_length(xdrproc_t, 134 caddr_t, 135 struct rpc_msg *, bool_t, int); 136 137 /* 138 * Server transport operations vector. 139 */ 140 struct svc_ops rdma_svc_ops = { 141 svc_rdma_krecv, /* Get requests */ 142 svc_rdma_kgetargs, /* Deserialize arguments */ 143 svc_rdma_ksend, /* Send reply */ 144 svc_rdma_kfreeargs, /* Free argument data space */ 145 svc_rdma_kdestroy, /* Destroy transport handle */ 146 svc_rdma_kdup, /* Check entry in dup req cache */ 147 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */ 148 svc_rdma_kgetres, /* Get pointer to response buffer */ 149 svc_rdma_kfreeres, /* Destroy pre-serialized response header */ 150 svc_rdma_kclone_destroy, /* Destroy a clone xprt */ 151 svc_rdma_kstart, /* Tell `ready-to-receive' to rpcmod */ 152 svc_rdma_kclone_xprt, /* Transport specific clone xprt */ 153 svc_rdma_ktattrs, /* Get Transport Attributes */ 154 NULL, /* Increment transport reference count */ 155 NULL /* Decrement transport reference count */ 156 }; 157 158 /* 159 * Server statistics 160 * NOTE: This structure type is duplicated in the NFS fast path. 161 */ 162 struct { 163 kstat_named_t rscalls; 164 kstat_named_t rsbadcalls; 165 kstat_named_t rsnullrecv; 166 kstat_named_t rsbadlen; 167 kstat_named_t rsxdrcall; 168 kstat_named_t rsdupchecks; 169 kstat_named_t rsdupreqs; 170 kstat_named_t rslongrpcs; 171 kstat_named_t rstotalreplies; 172 kstat_named_t rstotallongreplies; 173 kstat_named_t rstotalinlinereplies; 174 } rdmarsstat = { 175 { "calls", KSTAT_DATA_UINT64 }, 176 { "badcalls", KSTAT_DATA_UINT64 }, 177 { "nullrecv", KSTAT_DATA_UINT64 }, 178 { "badlen", KSTAT_DATA_UINT64 }, 179 { "xdrcall", KSTAT_DATA_UINT64 }, 180 { "dupchecks", KSTAT_DATA_UINT64 }, 181 { "dupreqs", KSTAT_DATA_UINT64 }, 182 { "longrpcs", KSTAT_DATA_UINT64 }, 183 { "totalreplies", KSTAT_DATA_UINT64 }, 184 { "totallongreplies", KSTAT_DATA_UINT64 }, 185 { "totalinlinereplies", KSTAT_DATA_UINT64 }, 186 }; 187 188 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat; 189 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t); 190 191 #define RSSTAT_INCR(x) atomic_inc_64(&rdmarsstat.x.value.ui64) 192 /* 193 * Create a transport record. 194 * The transport record, output buffer, and private data structure 195 * are allocated. The output buffer is serialized into using xdrmem. 196 * There is one transport record per user process which implements a 197 * set of services. 198 */ 199 /* ARGSUSED */ 200 int 201 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id, 202 rdma_xprt_group_t *started_xprts) 203 { 204 int error; 205 SVCMASTERXPRT *xprt; 206 struct rdma_data *rd; 207 rdma_registry_t *rmod; 208 rdma_xprt_record_t *xprt_rec; 209 queue_t *q; 210 /* 211 * modload the RDMA plugins is not already done. 212 */ 213 if (!rdma_modloaded) { 214 /*CONSTANTCONDITION*/ 215 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN); 216 217 mutex_enter(&rdma_modload_lock); 218 if (!rdma_modloaded) { 219 error = rdma_modload(); 220 } 221 mutex_exit(&rdma_modload_lock); 222 223 if (error) 224 return (error); 225 } 226 227 /* 228 * master_xprt_count is the count of master transport handles 229 * that were successfully created and are ready to recieve for 230 * RDMA based access. 231 */ 232 error = 0; 233 xprt_rec = NULL; 234 rw_enter(&rdma_lock, RW_READER); 235 if (rdma_mod_head == NULL) { 236 started_xprts->rtg_count = 0; 237 rw_exit(&rdma_lock); 238 if (rdma_dev_available) 239 return (EPROTONOSUPPORT); 240 else 241 return (ENODEV); 242 } 243 244 /* 245 * If we have reached here, then atleast one RDMA plugin has loaded. 246 * Create a master_xprt, make it start listenining on the device, 247 * if an error is generated, record it, we might need to shut 248 * the master_xprt. 249 * SVC_START() calls svc_rdma_kstart which calls plugin binding 250 * routines. 251 */ 252 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) { 253 254 /* 255 * One SVCMASTERXPRT per RDMA plugin. 256 */ 257 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP); 258 xprt->xp_ops = &rdma_svc_ops; 259 xprt->xp_sct = sct; 260 xprt->xp_type = T_RDMA; 261 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL); 262 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL); 263 xprt->xp_req_head = (mblk_t *)0; 264 xprt->xp_req_tail = (mblk_t *)0; 265 xprt->xp_full = FALSE; 266 xprt->xp_enable = FALSE; 267 xprt->xp_reqs = 0; 268 xprt->xp_size = 0; 269 xprt->xp_threads = 0; 270 xprt->xp_detached_threads = 0; 271 272 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP); 273 xprt->xp_p2 = (caddr_t)rd; 274 rd->rd_xprt = xprt; 275 rd->r_mod = rmod->r_mod; 276 277 q = &rd->rd_data.q; 278 xprt->xp_wq = q; 279 q->q_ptr = &rd->rd_xprt; 280 xprt->xp_netid = NULL; 281 282 /* 283 * Each of the plugins will have their own Service ID 284 * to listener specific mapping, like port number for VI 285 * and service name for IB. 286 */ 287 rd->rd_data.svcid = id; 288 error = svc_xprt_register(xprt, id); 289 if (error) { 290 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg); 291 goto cleanup; 292 } 293 294 SVC_START(xprt); 295 if (!rd->rd_data.active) { 296 svc_xprt_unregister(xprt); 297 error = rd->rd_data.err_code; 298 goto cleanup; 299 } 300 301 /* 302 * This is set only when there is atleast one or more 303 * transports successfully created. We insert the pointer 304 * to the created RDMA master xprt into a separately maintained 305 * list. This way we can easily reference it later to cleanup, 306 * when NFS kRPC service pool is going away/unregistered. 307 */ 308 started_xprts->rtg_count ++; 309 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP); 310 xprt_rec->rtr_xprt_ptr = xprt; 311 xprt_rec->rtr_next = started_xprts->rtg_listhead; 312 started_xprts->rtg_listhead = xprt_rec; 313 continue; 314 cleanup: 315 SVC_DESTROY(xprt); 316 if (error == RDMA_FAILED) 317 error = EPROTONOSUPPORT; 318 } 319 320 rw_exit(&rdma_lock); 321 322 /* 323 * Don't return any error even if a single plugin was started 324 * successfully. 325 */ 326 if (started_xprts->rtg_count == 0) 327 return (error); 328 return (0); 329 } 330 331 /* 332 * Cleanup routine for freeing up memory allocated by 333 * svc_rdma_kcreate() 334 */ 335 void 336 svc_rdma_kdestroy(SVCMASTERXPRT *xprt) 337 { 338 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2; 339 340 341 mutex_destroy(&xprt->xp_req_lock); 342 mutex_destroy(&xprt->xp_thread_lock); 343 kmem_free(rd, sizeof (*rd)); 344 kmem_free(xprt, sizeof (*xprt)); 345 } 346 347 348 static void 349 svc_rdma_kstart(SVCMASTERXPRT *xprt) 350 { 351 struct rdma_svc_data *svcdata; 352 rdma_mod_t *rmod; 353 354 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 355 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 356 357 /* 358 * Create a listener for module at this port 359 */ 360 361 if (rmod->rdma_count != 0) 362 (*rmod->rdma_ops->rdma_svc_listen)(svcdata); 363 else 364 svcdata->err_code = RDMA_FAILED; 365 } 366 367 void 368 svc_rdma_kstop(SVCMASTERXPRT *xprt) 369 { 370 struct rdma_svc_data *svcdata; 371 rdma_mod_t *rmod; 372 373 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data; 374 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod; 375 376 /* 377 * Call the stop listener routine for each plugin. If rdma_count is 378 * already zero set active to zero. 379 */ 380 if (rmod->rdma_count != 0) 381 (*rmod->rdma_ops->rdma_svc_stop)(svcdata); 382 else 383 svcdata->active = 0; 384 if (svcdata->active) 385 DTRACE_PROBE(krpc__e__svcrdma__kstop); 386 } 387 388 /* ARGSUSED */ 389 static void 390 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt) 391 { 392 393 struct clone_rdma_data *cdrp; 394 cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 395 396 /* 397 * Only free buffers and release connection when cloned is set. 398 */ 399 if (cdrp->cloned != TRUE) 400 return; 401 402 rdma_buf_free(cdrp->conn, &cdrp->rpcbuf); 403 if (cdrp->cl_reply) { 404 clist_free(cdrp->cl_reply); 405 cdrp->cl_reply = NULL; 406 } 407 RDMA_REL_CONN(cdrp->conn); 408 409 cdrp->cloned = 0; 410 } 411 412 /* 413 * Clone the xprt specific information. It will be freed by 414 * SVC_CLONE_DESTROY. 415 */ 416 static void 417 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt) 418 { 419 struct clone_rdma_data *srcp2; 420 struct clone_rdma_data *dstp2; 421 422 srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf; 423 dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf; 424 425 if (srcp2->conn != NULL) { 426 srcp2->cloned = TRUE; 427 *dstp2 = *srcp2; 428 } 429 } 430 431 static void 432 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr) 433 { 434 CONN *conn; 435 *tattr = NULL; 436 437 switch (attrflag) { 438 case SVC_TATTR_ADDRMASK: 439 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn; 440 ASSERT(conn != NULL); 441 if (conn) 442 *tattr = (void *)&conn->c_addrmask; 443 } 444 } 445 446 static bool_t 447 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg) 448 { 449 XDR *xdrs; 450 CONN *conn; 451 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr; 452 struct clone_rdma_data *crdp; 453 struct clist *cl = NULL; 454 struct clist *wcl = NULL; 455 struct clist *cllong = NULL; 456 457 rdma_stat status; 458 uint32_t vers, op, pos, xid; 459 uint32_t rdma_credit; 460 uint32_t wcl_total_length = 0; 461 bool_t wwl = FALSE; 462 463 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 464 RSSTAT_INCR(rscalls); 465 conn = rdp->conn; 466 467 status = rdma_svc_postrecv(conn); 468 if (status != RDMA_SUCCESS) { 469 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv); 470 goto badrpc_call; 471 } 472 473 xdrs = &clone_xprt->xp_xdrin; 474 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE); 475 xid = *(uint32_t *)rdp->rpcmsg.addr; 476 XDR_SETPOS(xdrs, sizeof (uint32_t)); 477 478 if (! xdr_u_int(xdrs, &vers) || 479 ! xdr_u_int(xdrs, &rdma_credit) || 480 ! xdr_u_int(xdrs, &op)) { 481 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint); 482 goto xdr_err; 483 } 484 485 /* Checking if the status of the recv operation was normal */ 486 if (rdp->status != 0) { 487 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status, 488 int, rdp->status); 489 goto badrpc_call; 490 } 491 492 if (! xdr_do_clist(xdrs, &cl)) { 493 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist); 494 goto xdr_err; 495 } 496 497 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) { 498 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist); 499 if (cl) 500 clist_free(cl); 501 goto xdr_err; 502 } 503 crdp->cl_wlist = wcl; 504 505 crdp->cl_reply = NULL; 506 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply); 507 508 /* 509 * A chunk at 0 offset indicates that the RPC call message 510 * is in a chunk. Get the RPC call message chunk. 511 */ 512 if (cl != NULL && op == RDMA_NOMSG) { 513 514 /* Remove RPC call message chunk from chunklist */ 515 cllong = cl; 516 cl = cl->c_next; 517 cllong->c_next = NULL; 518 519 520 /* Allocate and register memory for the RPC call msg chunk */ 521 cllong->rb_longbuf.type = RDMA_LONG_BUFFER; 522 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ? 523 cllong->c_len : LONG_REPLY_LEN; 524 525 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) { 526 clist_free(cllong); 527 goto cll_malloc_err; 528 } 529 530 cllong->u.c_daddr3 = cllong->rb_longbuf.addr; 531 532 if (cllong->u.c_daddr == NULL) { 533 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem); 534 rdma_buf_free(conn, &cllong->rb_longbuf); 535 clist_free(cllong); 536 goto cll_malloc_err; 537 } 538 539 status = clist_register(conn, cllong, CLIST_REG_DST); 540 if (status) { 541 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg); 542 rdma_buf_free(conn, &cllong->rb_longbuf); 543 clist_free(cllong); 544 goto cll_malloc_err; 545 } 546 547 /* 548 * Now read the RPC call message in 549 */ 550 status = RDMA_READ(conn, cllong, WAIT); 551 if (status) { 552 DTRACE_PROBE(krpc__e__svcrdma__krecv__read); 553 (void) clist_deregister(conn, cllong); 554 rdma_buf_free(conn, &cllong->rb_longbuf); 555 clist_free(cllong); 556 goto cll_malloc_err; 557 } 558 559 status = clist_syncmem(conn, cllong, CLIST_REG_DST); 560 (void) clist_deregister(conn, cllong); 561 562 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3, 563 cllong->c_len, 0, cl, XDR_DECODE, conn); 564 565 crdp->rpcbuf = cllong->rb_longbuf; 566 crdp->rpcbuf.len = cllong->c_len; 567 clist_free(cllong); 568 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 569 } else { 570 pos = XDR_GETPOS(xdrs); 571 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos, 572 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn); 573 crdp->rpcbuf = rdp->rpcmsg; 574 575 /* Use xdrrdmablk_ops to indicate there is a read chunk list */ 576 if (cl != NULL) { 577 int32_t flg = XDR_RDMA_RLIST_REG; 578 579 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 580 xdrs->x_ops = &xdrrdmablk_ops; 581 } 582 } 583 584 if (crdp->cl_wlist) { 585 int32_t flg = XDR_RDMA_WLIST_REG; 586 587 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist); 588 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg); 589 } 590 591 if (! xdr_callmsg(xdrs, msg)) { 592 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg); 593 RSSTAT_INCR(rsxdrcall); 594 goto callmsg_err; 595 } 596 597 /* 598 * Point the remote transport address in the service_transport 599 * handle at the address in the request. 600 */ 601 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf; 602 clone_xprt->xp_rtaddr.len = conn->c_raddr.len; 603 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len; 604 605 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf; 606 clone_xprt->xp_lcladdr.len = conn->c_laddr.len; 607 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len; 608 609 /* 610 * In case of RDMA, connection management is 611 * entirely done in rpcib module and netid in the 612 * SVCMASTERXPRT is NULL. Initialize the clone netid 613 * from the connection. 614 */ 615 616 clone_xprt->xp_netid = conn->c_netid; 617 618 clone_xprt->xp_xid = xid; 619 crdp->conn = conn; 620 621 freeb(mp); 622 623 return (TRUE); 624 625 callmsg_err: 626 rdma_buf_free(conn, &crdp->rpcbuf); 627 628 cll_malloc_err: 629 if (cl) 630 clist_free(cl); 631 xdr_err: 632 XDR_DESTROY(xdrs); 633 634 badrpc_call: 635 RDMA_BUF_FREE(conn, &rdp->rpcmsg); 636 RDMA_REL_CONN(conn); 637 freeb(mp); 638 RSSTAT_INCR(rsbadcalls); 639 return (FALSE); 640 } 641 642 static int 643 svc_process_long_reply(SVCXPRT * clone_xprt, 644 xdrproc_t xdr_results, caddr_t xdr_location, 645 struct rpc_msg *msg, bool_t has_args, int *msglen, 646 int *freelen, int *numchunks, unsigned int *final_len) 647 { 648 int status; 649 XDR xdrslong; 650 struct clist *wcl = NULL; 651 int count = 0; 652 int alloc_len; 653 char *memp; 654 rdma_buf_t long_rpc = {0}; 655 struct clone_rdma_data *crdp; 656 657 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 658 659 bzero(&xdrslong, sizeof (xdrslong)); 660 661 /* Choose a size for the long rpc response */ 662 if (MSG_IS_RPCSEC_GSS(msg)) { 663 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen); 664 } else { 665 alloc_len = RNDUP(*msglen); 666 } 667 668 if (alloc_len <= 64 * 1024) { 669 if (alloc_len > 32 * 1024) { 670 alloc_len = 64 * 1024; 671 } else { 672 if (alloc_len > 16 * 1024) { 673 alloc_len = 32 * 1024; 674 } else { 675 alloc_len = 16 * 1024; 676 } 677 } 678 } 679 680 long_rpc.type = RDMA_LONG_BUFFER; 681 long_rpc.len = alloc_len; 682 if (rdma_buf_alloc(crdp->conn, &long_rpc)) { 683 return (SVC_RDMA_FAIL); 684 } 685 686 memp = long_rpc.addr; 687 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE); 688 689 msg->rm_xid = clone_xprt->xp_xid; 690 691 if (!(xdr_replymsg(&xdrslong, msg) && 692 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong, 693 xdr_results, xdr_location)))) { 694 rdma_buf_free(crdp->conn, &long_rpc); 695 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap); 696 return (SVC_RDMA_FAIL); 697 } 698 699 *final_len = XDR_GETPOS(&xdrslong); 700 701 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len); 702 *numchunks = 0; 703 *freelen = 0; 704 705 wcl = crdp->cl_reply; 706 wcl->rb_longbuf = long_rpc; 707 708 count = *final_len; 709 while ((wcl != NULL) && (count > 0)) { 710 711 if (wcl->c_dmemhandle.mrc_rmr == 0) 712 break; 713 714 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count, 715 uint32_t, wcl->c_len); 716 717 if (wcl->c_len > count) { 718 wcl->c_len = count; 719 } 720 wcl->w.c_saddr3 = (caddr_t)memp; 721 722 count -= wcl->c_len; 723 *numchunks += 1; 724 memp += wcl->c_len; 725 wcl = wcl->c_next; 726 } 727 728 /* 729 * Make rest of the chunks 0-len 730 */ 731 while (wcl != NULL) { 732 if (wcl->c_dmemhandle.mrc_rmr == 0) 733 break; 734 wcl->c_len = 0; 735 wcl = wcl->c_next; 736 } 737 738 wcl = crdp->cl_reply; 739 740 /* 741 * MUST fail if there are still more data 742 */ 743 if (count > 0) { 744 rdma_buf_free(crdp->conn, &long_rpc); 745 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist); 746 return (SVC_RDMA_FAIL); 747 } 748 749 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) { 750 rdma_buf_free(crdp->conn, &long_rpc); 751 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg); 752 return (SVC_RDMA_FAIL); 753 } 754 755 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE); 756 757 if (status) { 758 (void) clist_deregister(crdp->conn, wcl); 759 rdma_buf_free(crdp->conn, &long_rpc); 760 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem); 761 return (SVC_RDMA_FAIL); 762 } 763 764 status = RDMA_WRITE(crdp->conn, wcl, WAIT); 765 766 (void) clist_deregister(crdp->conn, wcl); 767 rdma_buf_free(crdp->conn, &wcl->rb_longbuf); 768 769 if (status != RDMA_SUCCESS) { 770 DTRACE_PROBE(krpc__e__svcrdma__longrep__write); 771 return (SVC_RDMA_FAIL); 772 } 773 774 return (SVC_RDMA_SUCCESS); 775 } 776 777 778 static int 779 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results, 780 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs, 781 struct rpc_msg *msg, bool_t has_args, uint_t *len) 782 { 783 /* 784 * Get a pre-allocated buffer for rpc reply 785 */ 786 rpcreply->type = SEND_BUFFER; 787 if (rdma_buf_alloc(conn, rpcreply)) { 788 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs); 789 return (SVC_RDMA_FAIL); 790 } 791 792 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len, 793 0, NULL, XDR_ENCODE, conn); 794 795 msg->rm_xid = clone_xprt->xp_xid; 796 797 if (has_args) { 798 if (!(xdr_replymsg(*xdrs, msg) && 799 (!has_args || 800 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs, 801 xdr_results, xdr_location)))) { 802 rdma_buf_free(conn, rpcreply); 803 DTRACE_PROBE( 804 krpc__e__svcrdma__rpcmsg__reply__authwrap1); 805 return (SVC_RDMA_FAIL); 806 } 807 } else { 808 if (!xdr_replymsg(*xdrs, msg)) { 809 rdma_buf_free(conn, rpcreply); 810 DTRACE_PROBE( 811 krpc__e__svcrdma__rpcmsg__reply__authwrap2); 812 return (SVC_RDMA_FAIL); 813 } 814 } 815 816 *len = XDR_GETPOS(*xdrs); 817 818 return (SVC_RDMA_SUCCESS); 819 } 820 821 /* 822 * Send rpc reply. 823 */ 824 static bool_t 825 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg) 826 { 827 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout); 828 XDR xdrs_rhdr; 829 CONN *conn = NULL; 830 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0}; 831 832 struct clone_rdma_data *crdp; 833 struct clist *cl_read = NULL; 834 struct clist *cl_send = NULL; 835 struct clist *cl_write = NULL; 836 xdrproc_t xdr_results; /* results XDR encoding function */ 837 caddr_t xdr_location; /* response results pointer */ 838 839 int retval = FALSE; 840 int status, msglen, num_wreply_segments = 0; 841 uint32_t rdma_credit = 0; 842 int freelen = 0; 843 bool_t has_args; 844 uint_t final_resp_len, rdma_response_op, vers; 845 846 bzero(&xdrs_rhdr, sizeof (XDR)); 847 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 848 conn = crdp->conn; 849 850 /* 851 * If there is a result procedure specified in the reply message, 852 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP. 853 * We need to make sure it won't be processed twice, so we null 854 * it for xdr_replymsg here. 855 */ 856 has_args = FALSE; 857 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 858 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 859 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) { 860 has_args = TRUE; 861 xdr_location = msg->acpted_rply.ar_results.where; 862 msg->acpted_rply.ar_results.proc = xdr_void; 863 msg->acpted_rply.ar_results.where = NULL; 864 } 865 } 866 867 /* 868 * Given the limit on the inline response size (RPC_MSG_SZ), 869 * there is a need to make a guess as to the overall size of 870 * the response. If the resultant size is beyond the inline 871 * size, then the server needs to use the "reply chunk list" 872 * provided by the client (if the client provided one). An 873 * example of this type of response would be a READDIR 874 * response (e.g. a small directory read would fit in RPC_MSG_SZ 875 * and that is the preference but it may not fit) 876 * 877 * Combine the encoded size and the size of the true results 878 * and then make the decision about where to encode and send results. 879 * 880 * One important note, this calculation is ignoring the size 881 * of the encoding of the authentication overhead. The reason 882 * for this is rooted in the complexities of access to the 883 * encoded size of RPCSEC_GSS related authentiation, 884 * integrity, and privacy. 885 * 886 * If it turns out that the encoded authentication bumps the 887 * response over the RPC_MSG_SZ limit, then it may need to 888 * attempt to encode for the reply chunk list. 889 */ 890 891 /* 892 * Calculating the "sizeof" the RPC response header and the 893 * encoded results. 894 */ 895 msglen = xdr_sizeof(xdr_replymsg, msg); 896 897 if (msglen > 0) { 898 RSSTAT_INCR(rstotalreplies); 899 } 900 if (has_args) 901 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 902 rdma_minchunk, NULL, NULL); 903 904 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen); 905 906 status = SVC_RDMA_SUCCESS; 907 908 if (msglen < RPC_MSG_SZ) { 909 /* 910 * Looks like the response will fit in the inline 911 * response; let's try 912 */ 913 RSSTAT_INCR(rstotalinlinereplies); 914 915 rdma_response_op = RDMA_MSG; 916 917 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results, 918 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg, 919 has_args, &final_resp_len); 920 921 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status, 922 int, status); 923 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len, 924 int, final_resp_len); 925 926 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) { 927 clist_free(crdp->cl_reply); 928 crdp->cl_reply = NULL; 929 } 930 } 931 932 /* 933 * If the encode failed (size?) or the message really is 934 * larger than what is allowed, try the response chunk list. 935 */ 936 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) { 937 /* 938 * attempting to use a reply chunk list when there 939 * isn't one won't get very far... 940 */ 941 if (crdp->cl_reply == NULL) { 942 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl); 943 goto out; 944 } 945 946 RSSTAT_INCR(rstotallongreplies); 947 948 msglen = xdr_sizeof(xdr_replymsg, msg); 949 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0, 950 NULL, NULL); 951 952 status = svc_process_long_reply(clone_xprt, xdr_results, 953 xdr_location, msg, has_args, &msglen, &freelen, 954 &num_wreply_segments, &final_resp_len); 955 956 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen, 957 int, final_resp_len); 958 959 if (status != SVC_RDMA_SUCCESS) { 960 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed); 961 goto out; 962 } 963 964 rdma_response_op = RDMA_NOMSG; 965 } 966 967 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len, 968 int, final_resp_len); 969 970 rbuf_resp.type = SEND_BUFFER; 971 if (rdma_buf_alloc(conn, &rbuf_resp)) { 972 rdma_buf_free(conn, &rbuf_rpc_resp); 973 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs); 974 goto out; 975 } 976 977 rdma_credit = rdma_bufs_granted; 978 979 vers = RPCRDMA_VERS; 980 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE); 981 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid; 982 /* Skip xid and set the xdr position accordingly. */ 983 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t)); 984 if (!xdr_u_int(&xdrs_rhdr, &vers) || 985 !xdr_u_int(&xdrs_rhdr, &rdma_credit) || 986 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) { 987 rdma_buf_free(conn, &rbuf_rpc_resp); 988 rdma_buf_free(conn, &rbuf_resp); 989 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint); 990 goto out; 991 } 992 993 /* 994 * Now XDR the read chunk list, actually always NULL 995 */ 996 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read); 997 998 /* 999 * encode write list -- we already drove RDMA_WRITEs 1000 */ 1001 cl_write = crdp->cl_wlist; 1002 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) { 1003 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist); 1004 rdma_buf_free(conn, &rbuf_rpc_resp); 1005 rdma_buf_free(conn, &rbuf_resp); 1006 goto out; 1007 } 1008 1009 /* 1010 * XDR encode the RDMA_REPLY write chunk 1011 */ 1012 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply, 1013 num_wreply_segments)) { 1014 rdma_buf_free(conn, &rbuf_rpc_resp); 1015 rdma_buf_free(conn, &rbuf_resp); 1016 goto out; 1017 } 1018 1019 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle, 1020 rbuf_resp.addr, NULL, NULL); 1021 1022 if (rdma_response_op == RDMA_MSG) { 1023 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle, 1024 rbuf_rpc_resp.addr, NULL, NULL); 1025 } 1026 1027 status = RDMA_SEND(conn, cl_send, msg->rm_xid); 1028 1029 if (status == RDMA_SUCCESS) { 1030 retval = TRUE; 1031 } 1032 1033 out: 1034 /* 1035 * Free up sendlist chunks 1036 */ 1037 if (cl_send != NULL) 1038 clist_free(cl_send); 1039 1040 /* 1041 * Destroy private data for xdr rdma 1042 */ 1043 if (clone_xprt->xp_xdrout.x_ops != NULL) { 1044 XDR_DESTROY(&(clone_xprt->xp_xdrout)); 1045 } 1046 1047 if (crdp->cl_reply) { 1048 clist_free(crdp->cl_reply); 1049 crdp->cl_reply = NULL; 1050 } 1051 1052 /* 1053 * This is completely disgusting. If public is set it is 1054 * a pointer to a structure whose first field is the address 1055 * of the function to free that structure and any related 1056 * stuff. (see rrokfree in nfs_xdr.c). 1057 */ 1058 if (xdrs_rpc->x_public) { 1059 /* LINTED pointer alignment */ 1060 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public); 1061 } 1062 1063 if (xdrs_rhdr.x_ops != NULL) { 1064 XDR_DESTROY(&xdrs_rhdr); 1065 } 1066 1067 return (retval); 1068 } 1069 1070 /* 1071 * Deserialize arguments. 1072 */ 1073 static bool_t 1074 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr) 1075 { 1076 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin, 1077 xdr_args, args_ptr)) != TRUE) 1078 return (FALSE); 1079 return (TRUE); 1080 } 1081 1082 static bool_t 1083 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, 1084 caddr_t args_ptr) 1085 { 1086 struct clone_rdma_data *crdp; 1087 bool_t retval; 1088 1089 /* 1090 * If the cloned bit is true, then this transport specific 1091 * rmda data has been duplicated into another cloned xprt. Do 1092 * not free, or release the connection, it is still in use. The 1093 * buffers will be freed and the connection released later by 1094 * SVC_CLONE_DESTROY(). 1095 */ 1096 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf; 1097 if (crdp->cloned == TRUE) { 1098 crdp->cloned = 0; 1099 return (TRUE); 1100 } 1101 1102 /* 1103 * Free the args if needed then XDR_DESTROY 1104 */ 1105 if (args_ptr) { 1106 XDR *xdrs = &clone_xprt->xp_xdrin; 1107 1108 xdrs->x_op = XDR_FREE; 1109 retval = (*xdr_args)(xdrs, args_ptr); 1110 } 1111 1112 XDR_DESTROY(&(clone_xprt->xp_xdrin)); 1113 rdma_buf_free(crdp->conn, &crdp->rpcbuf); 1114 if (crdp->cl_reply) { 1115 clist_free(crdp->cl_reply); 1116 crdp->cl_reply = NULL; 1117 } 1118 RDMA_REL_CONN(crdp->conn); 1119 1120 return (retval); 1121 } 1122 1123 /* ARGSUSED */ 1124 static int32_t * 1125 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size) 1126 { 1127 return (NULL); 1128 } 1129 1130 /* ARGSUSED */ 1131 static void 1132 svc_rdma_kfreeres(SVCXPRT *clone_xprt) 1133 { 1134 } 1135 1136 /* 1137 * the dup cacheing routines below provide a cache of non-failure 1138 * transaction id's. rpc service routines can use this to detect 1139 * retransmissions and re-send a non-failure response. 1140 */ 1141 1142 /* 1143 * MAXDUPREQS is the number of cached items. It should be adjusted 1144 * to the service load so that there is likely to be a response entry 1145 * when the first retransmission comes in. 1146 */ 1147 #define MAXDUPREQS 8192 1148 1149 /* 1150 * This should be appropriately scaled to MAXDUPREQS. To produce as less as 1151 * possible collisions it is suggested to set this to a prime. 1152 */ 1153 #define DRHASHSZ 2053 1154 1155 #define XIDHASH(xid) ((xid) % DRHASHSZ) 1156 #define DRHASH(dr) XIDHASH((dr)->dr_xid) 1157 #define REQTOXID(req) ((req)->rq_xprt->xp_xid) 1158 1159 static int rdmandupreqs = 0; 1160 int rdmamaxdupreqs = MAXDUPREQS; 1161 static kmutex_t rdmadupreq_lock; 1162 static struct dupreq *rdmadrhashtbl[DRHASHSZ]; 1163 static int rdmadrhashstat[DRHASHSZ]; 1164 1165 static void unhash(struct dupreq *); 1166 1167 /* 1168 * rdmadrmru points to the head of a circular linked list in lru order. 1169 * rdmadrmru->dr_next == drlru 1170 */ 1171 struct dupreq *rdmadrmru; 1172 1173 /* 1174 * svc_rdma_kdup searches the request cache and returns 0 if the 1175 * request is not found in the cache. If it is found, then it 1176 * returns the state of the request (in progress or done) and 1177 * the status or attributes that were part of the original reply. 1178 */ 1179 static int 1180 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp, 1181 bool_t *dupcachedp) 1182 { 1183 struct dupreq *dr; 1184 uint32_t xid; 1185 uint32_t drhash; 1186 int status; 1187 1188 xid = REQTOXID(req); 1189 mutex_enter(&rdmadupreq_lock); 1190 RSSTAT_INCR(rsdupchecks); 1191 /* 1192 * Check to see whether an entry already exists in the cache. 1193 */ 1194 dr = rdmadrhashtbl[XIDHASH(xid)]; 1195 while (dr != NULL) { 1196 if (dr->dr_xid == xid && 1197 dr->dr_proc == req->rq_proc && 1198 dr->dr_prog == req->rq_prog && 1199 dr->dr_vers == req->rq_vers && 1200 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len && 1201 bcmp((caddr_t)dr->dr_addr.buf, 1202 (caddr_t)req->rq_xprt->xp_rtaddr.buf, 1203 dr->dr_addr.len) == 0) { 1204 status = dr->dr_status; 1205 if (status == DUP_DONE) { 1206 bcopy(dr->dr_resp.buf, res, size); 1207 if (dupcachedp != NULL) 1208 *dupcachedp = (dr->dr_resfree != NULL); 1209 } else { 1210 dr->dr_status = DUP_INPROGRESS; 1211 *drpp = dr; 1212 } 1213 RSSTAT_INCR(rsdupreqs); 1214 mutex_exit(&rdmadupreq_lock); 1215 return (status); 1216 } 1217 dr = dr->dr_chain; 1218 } 1219 1220 /* 1221 * There wasn't an entry, either allocate a new one or recycle 1222 * an old one. 1223 */ 1224 if (rdmandupreqs < rdmamaxdupreqs) { 1225 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP); 1226 if (dr == NULL) { 1227 mutex_exit(&rdmadupreq_lock); 1228 return (DUP_ERROR); 1229 } 1230 dr->dr_resp.buf = NULL; 1231 dr->dr_resp.maxlen = 0; 1232 dr->dr_addr.buf = NULL; 1233 dr->dr_addr.maxlen = 0; 1234 if (rdmadrmru) { 1235 dr->dr_next = rdmadrmru->dr_next; 1236 rdmadrmru->dr_next = dr; 1237 } else { 1238 dr->dr_next = dr; 1239 } 1240 rdmandupreqs++; 1241 } else { 1242 dr = rdmadrmru->dr_next; 1243 while (dr->dr_status == DUP_INPROGRESS) { 1244 dr = dr->dr_next; 1245 if (dr == rdmadrmru->dr_next) { 1246 mutex_exit(&rdmadupreq_lock); 1247 return (DUP_ERROR); 1248 } 1249 } 1250 unhash(dr); 1251 if (dr->dr_resfree) { 1252 (*dr->dr_resfree)(dr->dr_resp.buf); 1253 } 1254 } 1255 dr->dr_resfree = NULL; 1256 rdmadrmru = dr; 1257 1258 dr->dr_xid = REQTOXID(req); 1259 dr->dr_prog = req->rq_prog; 1260 dr->dr_vers = req->rq_vers; 1261 dr->dr_proc = req->rq_proc; 1262 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) { 1263 if (dr->dr_addr.buf != NULL) 1264 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen); 1265 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len; 1266 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP); 1267 if (dr->dr_addr.buf == NULL) { 1268 dr->dr_addr.maxlen = 0; 1269 dr->dr_status = DUP_DROP; 1270 mutex_exit(&rdmadupreq_lock); 1271 return (DUP_ERROR); 1272 } 1273 } 1274 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len; 1275 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len); 1276 if (dr->dr_resp.maxlen < size) { 1277 if (dr->dr_resp.buf != NULL) 1278 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen); 1279 dr->dr_resp.maxlen = (unsigned int)size; 1280 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP); 1281 if (dr->dr_resp.buf == NULL) { 1282 dr->dr_resp.maxlen = 0; 1283 dr->dr_status = DUP_DROP; 1284 mutex_exit(&rdmadupreq_lock); 1285 return (DUP_ERROR); 1286 } 1287 } 1288 dr->dr_status = DUP_INPROGRESS; 1289 1290 drhash = (uint32_t)DRHASH(dr); 1291 dr->dr_chain = rdmadrhashtbl[drhash]; 1292 rdmadrhashtbl[drhash] = dr; 1293 rdmadrhashstat[drhash]++; 1294 mutex_exit(&rdmadupreq_lock); 1295 *drpp = dr; 1296 return (DUP_NEW); 1297 } 1298 1299 /* 1300 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP) 1301 * and stores the response. 1302 */ 1303 static void 1304 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(), 1305 int size, int status) 1306 { 1307 ASSERT(dr->dr_resfree == NULL); 1308 if (status == DUP_DONE) { 1309 bcopy(res, dr->dr_resp.buf, size); 1310 dr->dr_resfree = dis_resfree; 1311 } 1312 dr->dr_status = status; 1313 } 1314 1315 /* 1316 * This routine expects that the mutex, rdmadupreq_lock, is already held. 1317 */ 1318 static void 1319 unhash(struct dupreq *dr) 1320 { 1321 struct dupreq *drt; 1322 struct dupreq *drtprev = NULL; 1323 uint32_t drhash; 1324 1325 ASSERT(MUTEX_HELD(&rdmadupreq_lock)); 1326 1327 drhash = (uint32_t)DRHASH(dr); 1328 drt = rdmadrhashtbl[drhash]; 1329 while (drt != NULL) { 1330 if (drt == dr) { 1331 rdmadrhashstat[drhash]--; 1332 if (drtprev == NULL) { 1333 rdmadrhashtbl[drhash] = drt->dr_chain; 1334 } else { 1335 drtprev->dr_chain = drt->dr_chain; 1336 } 1337 return; 1338 } 1339 drtprev = drt; 1340 drt = drt->dr_chain; 1341 } 1342 } 1343 1344 bool_t 1345 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist) 1346 { 1347 struct clist *clist; 1348 uint32_t tlen; 1349 1350 if (req->rq_xprt->xp_type != T_RDMA) { 1351 return (FALSE); 1352 } 1353 1354 tlen = 0; 1355 clist = wlist; 1356 while (clist) { 1357 tlen += clist->c_len; 1358 clist = clist->c_next; 1359 } 1360 1361 /* 1362 * set iov to addr+len of first segment of first wchunk of 1363 * wlist sent by client. krecv() already malloc'd a buffer 1364 * large enough, but registration is deferred until we write 1365 * the buffer back to (NFS) client using RDMA_WRITE. 1366 */ 1367 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr; 1368 iov->iov_len = tlen; 1369 1370 return (TRUE); 1371 } 1372 1373 /* 1374 * routine to setup the read chunk lists 1375 */ 1376 1377 int 1378 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len) 1379 { 1380 int data_len, avail_len; 1381 uint_t round_len; 1382 1383 data_len = avail_len = 0; 1384 1385 while (wcl != NULL && count > 0) { 1386 if (wcl->c_dmemhandle.mrc_rmr == 0) 1387 break; 1388 1389 if (wcl->c_len < count) { 1390 data_len += wcl->c_len; 1391 avail_len = 0; 1392 } else { 1393 data_len += count; 1394 avail_len = wcl->c_len - count; 1395 wcl->c_len = count; 1396 } 1397 count -= wcl->c_len; 1398 1399 if (count == 0) 1400 break; 1401 1402 wcl = wcl->c_next; 1403 } 1404 1405 /* 1406 * MUST fail if there are still more data 1407 */ 1408 if (count > 0) { 1409 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len, 1410 int, data_len, int, count); 1411 return (FALSE); 1412 } 1413 1414 /* 1415 * Round up the last chunk to 4-byte boundary 1416 */ 1417 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT); 1418 round_len = *wcl_len - data_len; 1419 1420 if (round_len) { 1421 1422 /* 1423 * If there is space in the current chunk, 1424 * add the roundup to the chunk. 1425 */ 1426 if (avail_len >= round_len) { 1427 wcl->c_len += round_len; 1428 } else { 1429 /* 1430 * try the next one. 1431 */ 1432 wcl = wcl->c_next; 1433 if ((wcl == NULL) || (wcl->c_len < round_len)) { 1434 DTRACE_PROBE1( 1435 krpc__e__rdma_setup_read_chunks_rndup, 1436 int, round_len); 1437 return (FALSE); 1438 } 1439 wcl->c_len = round_len; 1440 } 1441 } 1442 1443 wcl = wcl->c_next; 1444 1445 /* 1446 * Make rest of the chunks 0-len 1447 */ 1448 1449 clist_zero_len(wcl); 1450 1451 return (TRUE); 1452 } 1453