1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2006 Oracle. All rights reserved. 27 * 28 * This software is available to you under a choice of one of two 29 * licenses. You may choose to be licensed under the terms of the GNU 30 * General Public License (GPL) Version 2, available from the file 31 * COPYING in the main directory of this source tree, or the 32 * OpenIB.org BSD license below: 33 * 34 * Redistribution and use in source and binary forms, with or 35 * without modification, are permitted provided that the following 36 * conditions are met: 37 * 38 * - Redistributions of source code must retain the above 39 * copyright notice, this list of conditions and the following 40 * disclaimer. 41 * 42 * - Redistributions in binary form must reproduce the above 43 * copyright notice, this list of conditions and the following 44 * disclaimer in the documentation and/or other materials 45 * provided with the distribution. 46 * 47 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 48 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 49 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 50 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 51 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 52 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 53 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 54 * SOFTWARE. 55 * 56 */ 57 #include <sys/rds.h> 58 59 #include <sys/ib/clients/rdsv3/rdsv3.h> 60 #include <sys/ib/clients/rdsv3/rdma.h> 61 #include <sys/ib/clients/rdsv3/ib.h> 62 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 63 64 static void 65 rdsv3_ib_send_rdma_complete(struct rdsv3_message *rm, 66 int wc_status) 67 { 68 int notify_status; 69 70 RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d", 71 rm, wc_status); 72 73 switch (wc_status) { 74 case IBT_WC_WR_FLUSHED_ERR: 75 return; 76 77 case IBT_WC_SUCCESS: 78 notify_status = RDSV3_RDMA_SUCCESS; 79 break; 80 81 case IBT_WC_REMOTE_ACCESS_ERR: 82 notify_status = RDSV3_RDMA_REMOTE_ERROR; 83 break; 84 85 default: 86 notify_status = RDSV3_RDMA_OTHER_ERROR; 87 break; 88 } 89 rdsv3_rdma_send_complete(rm, notify_status); 90 91 RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d", 92 rm, wc_status); 93 } 94 95 static void rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, 96 uint_t num, struct rdsv3_rdma_sg scat[]); 97 98 void 99 rdsv3_ib_send_unmap_rdma(struct rdsv3_ib_connection *ic, 100 struct rdsv3_rdma_op *op) 101 { 102 RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rdma", "ic: %p, op: %p", ic, op); 103 if (op->r_mapped) { 104 op->r_mapped = 0; 105 if (ic->i_cm_id) { 106 rdsv3_ib_dma_unmap_sg_rdma(ic->i_cm_id->device, 107 op->r_nents, op->r_rdma_sg); 108 } else { 109 rdsv3_ib_dma_unmap_sg_rdma((struct ib_device *)NULL, 110 op->r_nents, op->r_rdma_sg); 111 } 112 } 113 } 114 115 static void 116 rdsv3_ib_send_unmap_rm(struct rdsv3_ib_connection *ic, 117 struct rdsv3_ib_send_work *send, 118 int wc_status) 119 { 120 struct rdsv3_message *rm = send->s_rm; 121 122 RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rm", "ic %p send %p rm %p\n", 123 ic, send, rm); 124 125 rdsv3_ib_dma_unmap_sg(ic->i_cm_id->device, 126 rm->m_sg, rm->m_nents); 127 128 if (rm->m_rdma_op != NULL) { 129 rdsv3_ib_send_unmap_rdma(ic, rm->m_rdma_op); 130 131 /* 132 * If the user asked for a completion notification on this 133 * message, we can implement three different semantics: 134 * 1. Notify when we received the ACK on the RDS message 135 * that was queued with the RDMA. This provides reliable 136 * notification of RDMA status at the expense of a one-way 137 * packet delay. 138 * 2. Notify when the IB stack gives us the completion 139 * event for the RDMA operation. 140 * 3. Notify when the IB stack gives us the completion 141 * event for the accompanying RDS messages. 142 * Here, we implement approach #3. To implement approach #2, 143 * call rdsv3_rdma_send_complete from the cq_handler. 144 * To implement #1, 145 * don't call rdsv3_rdma_send_complete at all, and fall back to 146 * the notify 147 * handling in the ACK processing code. 148 * 149 * Note: There's no need to explicitly sync any RDMA buffers 150 * using 151 * ib_dma_sync_sg_for_cpu - the completion for the RDMA 152 * operation itself unmapped the RDMA buffers, which takes care 153 * of synching. 154 */ 155 rdsv3_ib_send_rdma_complete(rm, wc_status); 156 157 if (rm->m_rdma_op->r_write) 158 rdsv3_stats_add(s_send_rdma_bytes, 159 rm->m_rdma_op->r_bytes); 160 else 161 rdsv3_stats_add(s_recv_rdma_bytes, 162 rm->m_rdma_op->r_bytes); 163 } 164 165 /* 166 * If anyone waited for this message to get flushed out, wake 167 * them up now 168 */ 169 rdsv3_message_unmapped(rm); 170 171 rdsv3_message_put(rm); 172 send->s_rm = NULL; 173 } 174 175 void 176 rdsv3_ib_send_init_ring(struct rdsv3_ib_connection *ic) 177 { 178 struct rdsv3_ib_send_work *send; 179 uint32_t i; 180 181 RDSV3_DPRINTF4("rdsv3_ib_send_init_ring", "ic: %p", ic); 182 183 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 184 send->s_rm = NULL; 185 send->s_op = NULL; 186 } 187 } 188 189 void 190 rdsv3_ib_send_clear_ring(struct rdsv3_ib_connection *ic) 191 { 192 struct rdsv3_ib_send_work *send; 193 uint32_t i; 194 195 RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "ic: %p", ic); 196 197 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 198 if (send->s_opcode == 0xdd) 199 continue; 200 if (send->s_rm) 201 rdsv3_ib_send_unmap_rm(ic, send, IBT_WC_WR_FLUSHED_ERR); 202 if (send->s_op) 203 rdsv3_ib_send_unmap_rdma(ic, send->s_op); 204 } 205 206 RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "Return: ic: %p", ic); 207 } 208 209 /* 210 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 211 * operations performed in the send path. As the sender allocs and potentially 212 * unallocs the next free entry in the ring it doesn't alter which is 213 * the next to be freed, which is what this is concerned with. 214 */ 215 void 216 rdsv3_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) 217 { 218 struct rdsv3_connection *conn = context; 219 struct rdsv3_ib_connection *ic = conn->c_transport_data; 220 ibt_wc_t wc; 221 struct rdsv3_ib_send_work *send; 222 uint32_t completed, polled; 223 uint32_t oldest; 224 uint32_t i = 0; 225 int ret; 226 227 RDSV3_DPRINTF4("rdsv3_ib_send_cq_comp_handler", "conn: %p cq: %p", 228 conn, cq); 229 230 rdsv3_ib_stats_inc(s_ib_tx_cq_call); 231 ret = ibt_enable_cq_notify(RDSV3_CQ2CQHDL(cq), IBT_NEXT_COMPLETION); 232 if (ret) 233 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler", 234 "ib_req_notify_cq send failed: %d", ret); 235 236 while (ibt_poll_cq(RDSV3_CQ2CQHDL(cq), &wc, 1, &polled) == 237 IBT_SUCCESS) { 238 RDSV3_DPRINTF5("rdsv3_ib_send_cq_comp_handler", 239 "swc wr_id 0x%llx status %u byte_len %u imm_data %u\n", 240 (unsigned long long)wc.wc_id, wc.wc_status, 241 wc.wc_bytes_xfer, ntohl(wc.wc_immed_data)); 242 rdsv3_ib_stats_inc(s_ib_tx_cq_event); 243 244 if (wc.wc_id == RDSV3_IB_ACK_WR_ID) { 245 if (ic->i_ack_queued + HZ/2 < jiffies) 246 rdsv3_ib_stats_inc(s_ib_tx_stalled); 247 rdsv3_ib_ack_send_complete(ic); 248 continue; 249 } 250 251 oldest = rdsv3_ib_ring_oldest(&ic->i_send_ring); 252 253 completed = rdsv3_ib_ring_completed(&ic->i_send_ring, 254 wc.wc_id, oldest); 255 256 for (i = 0; i < completed; i++) { 257 send = &ic->i_sends[oldest]; 258 259 /* 260 * In the error case, wc.opcode sometimes contains 261 * garbage 262 */ 263 switch (send->s_opcode) { 264 case IBT_WRC_SEND: 265 if (send->s_rm) 266 rdsv3_ib_send_unmap_rm(ic, send, 267 wc.wc_status); 268 break; 269 case IBT_WRC_RDMAW: 270 case IBT_WRC_RDMAR: 271 /* 272 * Nothing to be done - the SG list will 273 * be unmapped 274 * when the SEND completes. 275 */ 276 break; 277 default: 278 #ifndef __lock_lint 279 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler", 280 "RDS/IB: %s: unexpected opcode " 281 "0x%x in WR!", 282 __func__, send->s_opcode); 283 #endif 284 break; 285 } 286 287 send->s_opcode = 0xdd; 288 if (send->s_queued + HZ/2 < jiffies) 289 rdsv3_ib_stats_inc(s_ib_tx_stalled); 290 291 /* 292 * If a RDMA operation produced an error, signal 293 * this right 294 * away. If we don't, the subsequent SEND that goes 295 * with this 296 * RDMA will be canceled with ERR_WFLUSH, and the 297 * application 298 * never learn that the RDMA failed. 299 */ 300 if (wc.wc_status == 301 IBT_WC_REMOTE_ACCESS_ERR && send->s_op) { 302 struct rdsv3_message *rm; 303 304 rm = rdsv3_send_get_message(conn, send->s_op); 305 if (rm) { 306 if (rm->m_rdma_op != NULL) 307 rdsv3_ib_send_unmap_rdma(ic, 308 rm->m_rdma_op); 309 rdsv3_ib_send_rdma_complete(rm, 310 wc.wc_status); 311 rdsv3_message_put(rm); 312 } 313 } 314 315 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 316 } 317 318 RDSV3_DPRINTF4("rdsv3_ib_send_cq_comp_handler", "compl: %d", 319 completed); 320 rdsv3_ib_ring_free(&ic->i_send_ring, completed); 321 322 if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) || 323 test_bit(0, &conn->c_map_queued)) 324 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0); 325 326 /* We expect errors as the qp is drained during shutdown */ 327 if (wc.wc_status != IBT_WC_SUCCESS && rdsv3_conn_up(conn)) { 328 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler", 329 "send completion on %u.%u.%u.%u " 330 "had status %u, disconnecting and reconnecting\n", 331 NIPQUAD(conn->c_faddr), wc.wc_status); 332 rdsv3_conn_drop(conn); 333 } 334 } 335 336 RDSV3_DPRINTF4("rdsv3_ib_send_cq_comp_handler", 337 "Return: conn: %p, cq: %p", conn, cq); 338 } 339 340 /* 341 * This is the main function for allocating credits when sending 342 * messages. 343 * 344 * Conceptually, we have two counters: 345 * - send credits: this tells us how many WRs we're allowed 346 * to submit without overruning the reciever's queue. For 347 * each SEND WR we post, we decrement this by one. 348 * 349 * - posted credits: this tells us how many WRs we recently 350 * posted to the receive queue. This value is transferred 351 * to the peer as a "credit update" in a RDS header field. 352 * Every time we transmit credits to the peer, we subtract 353 * the amount of transferred credits from this counter. 354 * 355 * It is essential that we avoid situations where both sides have 356 * exhausted their send credits, and are unable to send new credits 357 * to the peer. We achieve this by requiring that we send at least 358 * one credit update to the peer before exhausting our credits. 359 * When new credits arrive, we subtract one credit that is withheld 360 * until we've posted new buffers and are ready to transmit these 361 * credits (see rdsv3_ib_send_add_credits below). 362 * 363 * The RDS send code is essentially single-threaded; rdsv3_send_xmit 364 * grabs c_send_lock to ensure exclusive access to the send ring. 365 * However, the ACK sending code is independent and can race with 366 * message SENDs. 367 * 368 * In the send path, we need to update the counters for send credits 369 * and the counter of posted buffers atomically - when we use the 370 * last available credit, we cannot allow another thread to race us 371 * and grab the posted credits counter. Hence, we have to use a 372 * spinlock to protect the credit counter, or use atomics. 373 * 374 * Spinlocks shared between the send and the receive path are bad, 375 * because they create unnecessary delays. An early implementation 376 * using a spinlock showed a 5% degradation in throughput at some 377 * loads. 378 * 379 * This implementation avoids spinlocks completely, putting both 380 * counters into a single atomic, and updating that atomic using 381 * atomic_add (in the receive path, when receiving fresh credits), 382 * and using atomic_cmpxchg when updating the two counters. 383 */ 384 int 385 rdsv3_ib_send_grab_credits(struct rdsv3_ib_connection *ic, 386 uint32_t wanted, uint32_t *adv_credits, int need_posted) 387 { 388 unsigned int avail, posted, got = 0, advertise; 389 long oldval, newval; 390 391 RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d", 392 ic, wanted, *adv_credits, need_posted); 393 394 *adv_credits = 0; 395 if (!ic->i_flowctl) 396 return (wanted); 397 398 try_again: 399 advertise = 0; 400 oldval = newval = atomic_get(&ic->i_credits); 401 posted = IB_GET_POST_CREDITS(oldval); 402 avail = IB_GET_SEND_CREDITS(oldval); 403 404 RDSV3_DPRINTF5("rdsv3_ib_send_grab_credits", 405 "wanted (%u): credits=%u posted=%u\n", wanted, avail, posted); 406 407 /* The last credit must be used to send a credit update. */ 408 if (avail && !posted) 409 avail--; 410 411 if (avail < wanted) { 412 struct rdsv3_connection *conn = ic->i_cm_id->context; 413 414 /* Oops, there aren't that many credits left! */ 415 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags); 416 got = avail; 417 } else { 418 /* Sometimes you get what you want, lalala. */ 419 got = wanted; 420 } 421 newval -= IB_SET_SEND_CREDITS(got); 422 423 /* 424 * If need_posted is non-zero, then the caller wants 425 * the posted regardless of whether any send credits are 426 * available. 427 */ 428 if (posted && (got || need_posted)) { 429 advertise = min(posted, RDSV3_MAX_ADV_CREDIT); 430 newval -= IB_SET_POST_CREDITS(advertise); 431 } 432 433 /* Finally bill everything */ 434 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval) 435 goto try_again; 436 437 *adv_credits = advertise; 438 439 RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d", 440 ic, got, *adv_credits, need_posted); 441 442 return (got); 443 } 444 445 void 446 rdsv3_ib_send_add_credits(struct rdsv3_connection *conn, unsigned int credits) 447 { 448 struct rdsv3_ib_connection *ic = conn->c_transport_data; 449 450 if (credits == 0) 451 return; 452 453 RDSV3_DPRINTF5("rdsv3_ib_send_add_credits", 454 "credits (%u): current=%u%s\n", 455 credits, 456 IB_GET_SEND_CREDITS(atomic_get(&ic->i_credits)), 457 test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) ? 458 ", ll_send_full" : ""); 459 460 atomic_add_32(&ic->i_credits, IB_SET_SEND_CREDITS(credits)); 461 if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 462 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0); 463 464 ASSERT(!(IB_GET_SEND_CREDITS(credits) >= 16384)); 465 466 rdsv3_ib_stats_inc(s_ib_rx_credit_updates); 467 468 RDSV3_DPRINTF4("rdsv3_ib_send_add_credits", 469 "Return: conn: %p, credits: %d", 470 conn, credits); 471 } 472 473 void 474 rdsv3_ib_advertise_credits(struct rdsv3_connection *conn, unsigned int posted) 475 { 476 struct rdsv3_ib_connection *ic = conn->c_transport_data; 477 478 RDSV3_DPRINTF4("rdsv3_ib_advertise_credits", "conn: %p, posted: %d", 479 conn, posted); 480 481 if (posted == 0) 482 return; 483 484 atomic_add_32(&ic->i_credits, IB_SET_POST_CREDITS(posted)); 485 486 /* 487 * Decide whether to send an update to the peer now. 488 * If we would send a credit update for every single buffer we 489 * post, we would end up with an ACK storm (ACK arrives, 490 * consumes buffer, we refill the ring, send ACK to remote 491 * advertising the newly posted buffer... ad inf) 492 * 493 * Performance pretty much depends on how often we send 494 * credit updates - too frequent updates mean lots of ACKs. 495 * Too infrequent updates, and the peer will run out of 496 * credits and has to throttle. 497 * For the time being, 16 seems to be a good compromise. 498 */ 499 if (IB_GET_POST_CREDITS(atomic_get(&ic->i_credits)) >= 16) 500 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 501 } 502 503 static inline void 504 rdsv3_ib_xmit_populate_wr(struct rdsv3_ib_connection *ic, 505 ibt_send_wr_t *wr, unsigned int pos, 506 struct rdsv3_scatterlist *scat, unsigned int off, unsigned int length, 507 int send_flags) 508 { 509 ibt_wr_ds_t *sge; 510 511 RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr", 512 "ic: %p, wr: %p scat: %p %d %d %d %d", 513 ic, wr, scat, pos, off, length, send_flags); 514 515 wr->wr_id = pos; 516 wr->wr_trans = IBT_RC_SRV; 517 wr->wr_flags = send_flags; 518 wr->wr_opcode = IBT_WRC_SEND; 519 520 if (length != 0) { 521 int ix, len, assigned; 522 ibt_wr_ds_t *sgl; 523 524 ASSERT(length <= scat->length - off); 525 526 sgl = scat->sgl; 527 if (off != 0) { 528 /* find the right sgl to begin with */ 529 while (sgl->ds_len <= off) { 530 off -= sgl->ds_len; 531 sgl++; 532 } 533 } 534 535 ix = 1; /* first data sgl is at 1 */ 536 assigned = 0; 537 len = length; 538 do { 539 sge = &wr->wr_sgl[ix++]; 540 sge->ds_va = sgl->ds_va + off; 541 assigned = min(len, sgl->ds_len - off); 542 sge->ds_len = assigned; 543 sge->ds_key = sgl->ds_key; 544 len -= assigned; 545 if (len != 0) { 546 sgl++; 547 off = 0; 548 } 549 } while (len > 0); 550 551 wr->wr_nds = ix; 552 } else { 553 /* 554 * We're sending a packet with no payload. There is only 555 * one SGE 556 */ 557 wr->wr_nds = 1; 558 } 559 560 sge = &wr->wr_sgl[0]; 561 sge->ds_va = ic->i_send_hdrs_dma + (pos * sizeof (struct rdsv3_header)); 562 sge->ds_len = sizeof (struct rdsv3_header); 563 sge->ds_key = ic->i_mr->lkey; 564 565 RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr", 566 "Return: ic: %p, wr: %p scat: %p", ic, wr, scat); 567 } 568 569 /* 570 * This can be called multiple times for a given message. The first time 571 * we see a message we map its scatterlist into the IB device so that 572 * we can provide that mapped address to the IB scatter gather entries 573 * in the IB work requests. We translate the scatterlist into a series 574 * of work requests that fragment the message. These work requests complete 575 * in order so we pass ownership of the message to the completion handler 576 * once we send the final fragment. 577 * 578 * The RDS core uses the c_send_lock to only enter this function once 579 * per connection. This makes sure that the tx ring alloc/unalloc pairs 580 * don't get out of sync and confuse the ring. 581 */ 582 int 583 rdsv3_ib_xmit(struct rdsv3_connection *conn, struct rdsv3_message *rm, 584 unsigned int hdr_off, unsigned int sg, unsigned int off) 585 { 586 struct rdsv3_ib_connection *ic = conn->c_transport_data; 587 struct ib_device *dev = ic->i_cm_id->device; 588 struct rdsv3_ib_send_work *send = NULL; 589 struct rdsv3_ib_send_work *first; 590 struct rdsv3_ib_send_work *prev; 591 ibt_send_wr_t *wr; 592 struct rdsv3_scatterlist *scat; 593 uint32_t pos; 594 uint32_t i; 595 uint32_t work_alloc; 596 uint32_t credit_alloc; 597 uint32_t posted; 598 uint32_t adv_credits = 0; 599 int send_flags = 0; 600 int sent; 601 int ret; 602 int flow_controlled = 0; 603 604 RDSV3_DPRINTF4("rdsv3_ib_xmit", "conn: %p, rm: %p", conn, rm); 605 606 ASSERT(!(off % RDSV3_FRAG_SIZE)); 607 ASSERT(!(hdr_off != 0 && hdr_off != sizeof (struct rdsv3_header))); 608 609 /* Do not send cong updates to IB loopback */ 610 if (conn->c_loopback && 611 rm->m_inc.i_hdr.h_flags & RDSV3_FLAG_CONG_BITMAP) { 612 rdsv3_cong_map_updated(conn->c_fcong, ~(uint64_t)0); 613 return (sizeof (struct rdsv3_header) + RDSV3_CONG_MAP_BYTES); 614 } 615 616 #ifndef __lock_lint 617 /* FIXME we may overallocate here */ 618 if (ntohl(rm->m_inc.i_hdr.h_len) == 0) 619 i = 1; 620 else 621 i = ceil(ntohl(rm->m_inc.i_hdr.h_len), RDSV3_FRAG_SIZE); 622 #endif 623 624 work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, i, &pos); 625 if (work_alloc == 0) { 626 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags); 627 rdsv3_ib_stats_inc(s_ib_tx_ring_full); 628 ret = -ENOMEM; 629 goto out; 630 } 631 632 credit_alloc = work_alloc; 633 if (ic->i_flowctl) { 634 credit_alloc = rdsv3_ib_send_grab_credits(ic, work_alloc, 635 &posted, 0); 636 adv_credits += posted; 637 if (credit_alloc < work_alloc) { 638 rdsv3_ib_ring_unalloc(&ic->i_send_ring, 639 work_alloc - credit_alloc); 640 work_alloc = credit_alloc; 641 flow_controlled++; 642 } 643 if (work_alloc == 0) { 644 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 645 rdsv3_ib_stats_inc(s_ib_tx_throttle); 646 ret = -ENOMEM; 647 goto out; 648 } 649 } 650 651 /* map the message the first time we see it */ 652 if (ic->i_rm == NULL) { 653 /* 654 * printk(KERN_NOTICE 655 * "rdsv3_ib_xmit prep msg dport=%u flags=0x%x len=%d\n", 656 * be16_to_cpu(rm->m_inc.i_hdr.h_dport), 657 * rm->m_inc.i_hdr.h_flags, 658 * be32_to_cpu(rm->m_inc.i_hdr.h_len)); 659 */ 660 if (rm->m_nents) { 661 rm->m_count = rdsv3_ib_dma_map_sg(dev, 662 rm->m_sg, rm->m_nents); 663 RDSV3_DPRINTF5("rdsv3_ib_xmit", 664 "ic %p mapping rm %p: %d\n", ic, rm, rm->m_count); 665 if (rm->m_count == 0) { 666 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure); 667 rdsv3_ib_ring_unalloc(&ic->i_send_ring, 668 work_alloc); 669 ret = -ENOMEM; /* XXX ? */ 670 RDSV3_DPRINTF2("rdsv3_ib_xmit", 671 "fail: ic %p mapping rm %p: %d\n", 672 ic, rm, rm->m_count); 673 goto out; 674 } 675 } else { 676 rm->m_count = 0; 677 } 678 679 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs; 680 ic->i_unsignaled_bytes = rdsv3_ib_sysctl_max_unsig_bytes; 681 rdsv3_message_addref(rm); 682 ic->i_rm = rm; 683 684 /* Finalize the header */ 685 if (test_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags)) 686 rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_ACK_REQUIRED; 687 if (test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) 688 rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_RETRANSMITTED; 689 690 /* 691 * If it has a RDMA op, tell the peer we did it. This is 692 * used by the peer to release use-once RDMA MRs. 693 */ 694 if (rm->m_rdma_op) { 695 struct rdsv3_ext_header_rdma ext_hdr; 696 697 ext_hdr.h_rdma_rkey = htonl(rm->m_rdma_op->r_key); 698 (void) rdsv3_message_add_extension(&rm->m_inc.i_hdr, 699 RDSV3_EXTHDR_RDMA, &ext_hdr, 700 sizeof (ext_hdr)); 701 } 702 if (rm->m_rdma_cookie) { 703 (void) rdsv3_message_add_rdma_dest_extension( 704 &rm->m_inc.i_hdr, 705 rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 706 rdsv3_rdma_cookie_offset(rm->m_rdma_cookie)); 707 } 708 709 /* 710 * Note - rdsv3_ib_piggyb_ack clears the ACK_REQUIRED bit, so 711 * we should not do this unless we have a chance of at least 712 * sticking the header into the send ring. Which is why we 713 * should call rdsv3_ib_ring_alloc first. 714 */ 715 rm->m_inc.i_hdr.h_ack = htonll(rdsv3_ib_piggyb_ack(ic)); 716 rdsv3_message_make_checksum(&rm->m_inc.i_hdr); 717 718 /* 719 * Update adv_credits since we reset the ACK_REQUIRED bit. 720 */ 721 (void) rdsv3_ib_send_grab_credits(ic, 0, &posted, 1); 722 adv_credits += posted; 723 ASSERT(adv_credits <= 255); 724 } 725 726 send = &ic->i_sends[pos]; 727 first = send; 728 prev = NULL; 729 scat = &rm->m_sg[sg]; 730 sent = 0; 731 i = 0; 732 733 /* 734 * Sometimes you want to put a fence between an RDMA 735 * READ and the following SEND. 736 * We could either do this all the time 737 * or when requested by the user. Right now, we let 738 * the application choose. 739 */ 740 if (rm->m_rdma_op && rm->m_rdma_op->r_fence) 741 send_flags = IBT_WR_SEND_FENCE; 742 743 /* 744 * We could be copying the header into the unused tail of the page. 745 * That would need to be changed in the future when those pages might 746 * be mapped userspace pages or page cache pages. So instead we always 747 * use a second sge and our long-lived ring of mapped headers. We send 748 * the header after the data so that the data payload can be aligned on 749 * the receiver. 750 */ 751 752 /* handle a 0-len message */ 753 if (ntohl(rm->m_inc.i_hdr.h_len) == 0) { 754 wr = &ic->i_send_wrs[0]; 755 rdsv3_ib_xmit_populate_wr(ic, wr, pos, NULL, 0, 0, send_flags); 756 send->s_queued = jiffies; 757 send->s_op = NULL; 758 send->s_opcode = wr->wr_opcode; 759 goto add_header; 760 } 761 762 /* if there's data reference it with a chain of work reqs */ 763 for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) { 764 unsigned int len; 765 766 send = &ic->i_sends[pos]; 767 768 wr = &ic->i_send_wrs[i]; 769 len = min(RDSV3_FRAG_SIZE, 770 rdsv3_ib_sg_dma_len(dev, scat) - off); 771 rdsv3_ib_xmit_populate_wr(ic, wr, pos, scat, off, len, 772 send_flags); 773 send->s_queued = jiffies; 774 send->s_op = NULL; 775 send->s_opcode = wr->wr_opcode; 776 777 /* 778 * We want to delay signaling completions just enough to get 779 * the batching benefits but not so much that we create dead 780 * time 781 * on the wire. 782 */ 783 if (ic->i_unsignaled_wrs-- == 0) { 784 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs; 785 wr->wr_flags |= 786 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 787 } 788 789 ic->i_unsignaled_bytes -= len; 790 if (ic->i_unsignaled_bytes <= 0) { 791 ic->i_unsignaled_bytes = 792 rdsv3_ib_sysctl_max_unsig_bytes; 793 wr->wr_flags |= 794 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 795 } 796 797 /* 798 * Always signal the last one if we're stopping due to flow 799 * control. 800 */ 801 if (flow_controlled && i == (work_alloc-1)) { 802 wr->wr_flags |= 803 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 804 } 805 806 RDSV3_DPRINTF5("rdsv3_ib_xmit", "send %p wr %p num_sge %u \n", 807 send, wr, wr->wr_nds); 808 809 sent += len; 810 off += len; 811 if (off == rdsv3_ib_sg_dma_len(dev, scat)) { 812 scat++; 813 off = 0; 814 } 815 816 add_header: 817 /* 818 * Tack on the header after the data. The header SGE 819 * should already 820 * have been set up to point to the right header buffer. 821 */ 822 (void) memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, 823 sizeof (struct rdsv3_header)); 824 825 if (0) { 826 struct rdsv3_header *hdr = &ic->i_send_hdrs[pos]; 827 828 RDSV3_DPRINTF2("rdsv3_ib_xmit", 829 "send WR dport=%u flags=0x%x len=%d", 830 ntohs(hdr->h_dport), 831 hdr->h_flags, 832 ntohl(hdr->h_len)); 833 } 834 if (adv_credits) { 835 struct rdsv3_header *hdr = &ic->i_send_hdrs[pos]; 836 837 /* add credit and redo the header checksum */ 838 hdr->h_credit = adv_credits; 839 rdsv3_message_make_checksum(hdr); 840 adv_credits = 0; 841 rdsv3_ib_stats_inc(s_ib_tx_credit_updates); 842 } 843 844 prev = send; 845 846 pos = (pos + 1) % ic->i_send_ring.w_nr; 847 } 848 849 /* 850 * Account the RDS header in the number of bytes we sent, but just once. 851 * The caller has no concept of fragmentation. 852 */ 853 if (hdr_off == 0) 854 sent += sizeof (struct rdsv3_header); 855 856 /* if we finished the message then send completion owns it */ 857 if (scat == &rm->m_sg[rm->m_count]) { 858 prev->s_rm = ic->i_rm; 859 wr->wr_flags |= IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 860 ic->i_rm = NULL; 861 } 862 863 if (i < work_alloc) { 864 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); 865 work_alloc = i; 866 } 867 if (ic->i_flowctl && i < credit_alloc) 868 rdsv3_ib_send_add_credits(conn, credit_alloc - i); 869 870 /* XXX need to worry about failed_wr and partial sends. */ 871 ret = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id), 872 ic->i_send_wrs, i, &posted); 873 if (posted != i) { 874 RDSV3_DPRINTF2("rdsv3_ib_xmit", 875 "ic %p first %p nwr: %d ret %d:%d", 876 ic, first, i, ret, posted); 877 } 878 if (ret) { 879 RDSV3_DPRINTF2("rdsv3_ib_xmit", 880 "RDS/IB: ib_post_send to %u.%u.%u.%u " 881 "returned %d\n", NIPQUAD(conn->c_faddr), ret); 882 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 883 if (prev->s_rm) { 884 ic->i_rm = prev->s_rm; 885 prev->s_rm = NULL; 886 } 887 RDSV3_DPRINTF2("rdsv3_ib_xmit", "ibt_post_send failed\n"); 888 rdsv3_conn_drop(ic->conn); 889 goto out; 890 } 891 892 ret = sent; 893 894 RDSV3_DPRINTF4("rdsv3_ib_xmit", "Return: conn: %p, rm: %p", conn, rm); 895 out: 896 ASSERT(!adv_credits); 897 return (ret); 898 } 899 900 static void 901 rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, uint_t num, 902 struct rdsv3_rdma_sg scat[]) 903 { 904 ibt_hca_hdl_t hca_hdl; 905 int i; 906 int num_sgl; 907 908 RDSV3_DPRINTF4("rdsv3_ib_dma_unmap_sg", "rdma_sg: %p", scat); 909 910 if (dev) { 911 hca_hdl = ib_get_ibt_hca_hdl(dev); 912 } else { 913 hca_hdl = scat[0].hca_hdl; 914 RDSV3_DPRINTF2("rdsv3_ib_dma_unmap_sg_rdma", 915 "NULL dev use cached hca_hdl %p", hca_hdl); 916 } 917 918 if (hca_hdl == NULL) 919 return; 920 scat[0].hca_hdl = NULL; 921 922 for (i = 0; i < num; i++) { 923 if (scat[i].mihdl != NULL) { 924 num_sgl = (scat[i].iovec.bytes / PAGESIZE) + 2; 925 kmem_free(scat[i].swr.wr_sgl, 926 (num_sgl * sizeof (ibt_wr_ds_t))); 927 scat[i].swr.wr_sgl = NULL; 928 (void) ibt_unmap_mem_iov(hca_hdl, scat[i].mihdl); 929 scat[i].mihdl = NULL; 930 } else 931 break; 932 } 933 } 934 935 /* ARGSUSED */ 936 uint_t 937 rdsv3_ib_dma_map_sg_rdma(struct ib_device *dev, struct rdsv3_rdma_sg scat[], 938 uint_t num, struct rdsv3_scatterlist **scatl) 939 { 940 ibt_hca_hdl_t hca_hdl; 941 ibt_iov_attr_t iov_attr; 942 struct buf *bp; 943 uint_t i, j, k; 944 uint_t count; 945 struct rdsv3_scatterlist *sg; 946 int ret; 947 948 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "scat: %p, num: %d", 949 scat, num); 950 951 hca_hdl = ib_get_ibt_hca_hdl(dev); 952 scat[0].hca_hdl = hca_hdl; 953 bzero(&iov_attr, sizeof (ibt_iov_attr_t)); 954 iov_attr.iov_flags = IBT_IOV_BUF; 955 iov_attr.iov_lso_hdr_sz = 0; 956 957 for (i = 0, count = 0; i < num; i++) { 958 /* transpose umem_cookie to buf structure */ 959 bp = ddi_umem_iosetup(scat[i].umem_cookie, 960 scat[i].iovec.addr & PAGEOFFSET, scat[i].iovec.bytes, 961 B_WRITE, 0, 0, NULL, DDI_UMEM_SLEEP); 962 if (bp == NULL) { 963 /* free resources and return error */ 964 goto out; 965 } 966 /* setup ibt_map_mem_iov() attributes */ 967 iov_attr.iov_buf = bp; 968 iov_attr.iov_wr_nds = (scat[i].iovec.bytes / PAGESIZE) + 2; 969 scat[i].swr.wr_sgl = 970 kmem_zalloc(iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t), 971 KM_SLEEP); 972 973 ret = ibt_map_mem_iov(hca_hdl, &iov_attr, 974 (ibt_all_wr_t *)&scat[i].swr, &scat[i].mihdl); 975 freerbuf(bp); 976 if (ret != IBT_SUCCESS) { 977 RDSV3_DPRINTF2("rdsv3_ib_dma_map_sg_rdma", 978 "ibt_map_mem_iov returned: %d", ret); 979 /* free resources and return error */ 980 kmem_free(scat[i].swr.wr_sgl, 981 iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t)); 982 goto out; 983 } 984 count += scat[i].swr.wr_nds; 985 986 #ifdef DEBUG 987 for (j = 0; j < scat[i].swr.wr_nds; j++) { 988 RDSV3_DPRINTF5("rdsv3_ib_dma_map_sg_rdma", 989 "sgl[%d] va %llx len %x", j, 990 scat[i].swr.wr_sgl[j].ds_va, 991 scat[i].swr.wr_sgl[j].ds_len); 992 } 993 #endif 994 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", 995 "iovec.bytes: 0x%x scat[%d]swr.wr_nds: %d", 996 scat[i].iovec.bytes, i, scat[i].swr.wr_nds); 997 } 998 999 count = ((count - 1) / RDSV3_IB_MAX_SGE) + 1; 1000 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "Ret: num: %d", count); 1001 return (count); 1002 1003 out: 1004 rdsv3_ib_dma_unmap_sg_rdma(dev, num, scat); 1005 return (0); 1006 } 1007 1008 int 1009 rdsv3_ib_xmit_rdma(struct rdsv3_connection *conn, struct rdsv3_rdma_op *op) 1010 { 1011 struct rdsv3_ib_connection *ic = conn->c_transport_data; 1012 struct rdsv3_ib_send_work *send = NULL; 1013 struct rdsv3_rdma_sg *scat; 1014 uint64_t remote_addr; 1015 uint32_t pos; 1016 uint32_t work_alloc; 1017 uint32_t i, j, k, idx; 1018 uint32_t left, count; 1019 uint32_t posted; 1020 int sent; 1021 ibt_status_t status; 1022 ibt_send_wr_t *wr; 1023 ibt_wr_ds_t *sge; 1024 1025 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "rdsv3_ib_conn: %p", ic); 1026 1027 /* map the message the first time we see it */ 1028 if (!op->r_mapped) { 1029 op->r_count = rdsv3_ib_dma_map_sg_rdma(ic->i_cm_id->device, 1030 op->r_rdma_sg, op->r_nents, &op->r_sg); 1031 RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma", "ic %p mapping op %p: %d", 1032 ic, op, op->r_count); 1033 if (op->r_count == 0) { 1034 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure); 1035 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma", 1036 "fail: ic %p mapping op %p: %d", 1037 ic, op, op->r_count); 1038 return (-ENOMEM); /* XXX ? */ 1039 } 1040 op->r_mapped = 1; 1041 } 1042 1043 /* 1044 * Instead of knowing how to return a partial rdma read/write 1045 * we insist that there 1046 * be enough work requests to send the entire message. 1047 */ 1048 work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, op->r_count, &pos); 1049 if (work_alloc != op->r_count) { 1050 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 1051 rdsv3_ib_stats_inc(s_ib_tx_ring_full); 1052 return (-ENOMEM); 1053 } 1054 1055 /* 1056 * take the scatter list and transpose into a list of 1057 * send wr's each with a scatter list of RDSV3_IB_MAX_SGE 1058 */ 1059 scat = &op->r_rdma_sg[0]; 1060 sent = 0; 1061 remote_addr = op->r_remote_addr; 1062 1063 for (i = 0, k = 0; i < op->r_nents; i++) { 1064 left = scat[i].swr.wr_nds; 1065 for (idx = 0; left > 0; k++) { 1066 send = &ic->i_sends[pos]; 1067 send->s_queued = jiffies; 1068 send->s_opcode = op->r_write ? IBT_WRC_RDMAW : 1069 IBT_WRC_RDMAR; 1070 send->s_op = op; 1071 1072 wr = &ic->i_send_wrs[k]; 1073 wr->wr_flags = 0; 1074 wr->wr_id = pos; 1075 wr->wr_trans = IBT_RC_SRV; 1076 wr->wr_opcode = op->r_write ? IBT_WRC_RDMAW : 1077 IBT_WRC_RDMAR; 1078 wr->wr.rc.rcwr.rdma.rdma_raddr = remote_addr; 1079 wr->wr.rc.rcwr.rdma.rdma_rkey = op->r_key; 1080 1081 if (left > RDSV3_IB_MAX_SGE) { 1082 count = RDSV3_IB_MAX_SGE; 1083 left -= RDSV3_IB_MAX_SGE; 1084 } else { 1085 count = left; 1086 left = 0; 1087 } 1088 wr->wr_nds = count; 1089 1090 for (j = 0; j < count; j++) { 1091 sge = &wr->wr_sgl[j]; 1092 *sge = scat[i].swr.wr_sgl[idx]; 1093 remote_addr += scat[i].swr.wr_sgl[idx].ds_len; 1094 sent += scat[i].swr.wr_sgl[idx].ds_len; 1095 idx++; 1096 RDSV3_DPRINTF4("xmit_rdma", 1097 "send_wrs[%d]sgl[%d] va %llx len %x", 1098 k, j, sge->ds_va, sge->ds_len); 1099 } 1100 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", 1101 "wr[%d] %p key: %x code: %d tlen: %d", 1102 k, wr, wr->wr.rc.rcwr.rdma.rdma_rkey, 1103 wr->wr_opcode, sent); 1104 1105 /* 1106 * We want to delay signaling completions just enough 1107 * to get the batching benefits but not so much that 1108 * we create dead time on the wire. 1109 */ 1110 if (ic->i_unsignaled_wrs-- == 0) { 1111 ic->i_unsignaled_wrs = 1112 rdsv3_ib_sysctl_max_unsig_wrs; 1113 wr->wr_flags = IBT_WR_SEND_SIGNAL; 1114 } 1115 1116 pos = (pos + 1) % ic->i_send_ring.w_nr; 1117 } 1118 } 1119 1120 status = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id), 1121 ic->i_send_wrs, k, &posted); 1122 if (status != IBT_SUCCESS) { 1123 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma", 1124 "RDS/IB: rdma ib_post_send to %u.%u.%u.%u " 1125 "returned %d", NIPQUAD(conn->c_faddr), status); 1126 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 1127 } 1128 return (status); 1129 } 1130 1131 void 1132 rdsv3_ib_xmit_complete(struct rdsv3_connection *conn) 1133 { 1134 struct rdsv3_ib_connection *ic = conn->c_transport_data; 1135 1136 RDSV3_DPRINTF4("rdsv3_ib_xmit_complete", "conn: %p", conn); 1137 1138 /* 1139 * We may have a pending ACK or window update we were unable 1140 * to send previously (due to flow control). Try again. 1141 */ 1142 rdsv3_ib_attempt_ack(ic); 1143 } 1144