1 /* 2 * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 */ 32 #include <linux/tcp.h> 33 #include <asm/ioctls.h> 34 #include <linux/workqueue.h> 35 #include <linux/net.h> 36 #include <linux/socket.h> 37 #include <net/protocol.h> 38 #include <net/inet_common.h> 39 #include <rdma/rdma_cm.h> 40 #include <rdma/ib_verbs.h> 41 #include <rdma/ib_fmr_pool.h> 42 #include <rdma/ib_umem.h> 43 #include <net/tcp.h> /* for memcpy_toiovec */ 44 #include <asm/io.h> 45 #include <asm/uaccess.h> 46 #include <linux/delay.h> 47 #include "sdp.h" 48 49 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa) 50 { 51 struct sdp_sock *ssk = sdp_sk(sk); 52 struct mbuf *mb; 53 int payload_len; 54 struct page *payload_pg; 55 int off, len; 56 struct ib_umem_chunk *chunk; 57 58 WARN_ON(ssk->tx_sa); 59 60 BUG_ON(!tx_sa); 61 BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey); 62 BUG_ON(!tx_sa->umem); 63 BUG_ON(!tx_sa->umem->chunk_list.next); 64 65 chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list); 66 BUG_ON(!chunk->nmap); 67 68 off = tx_sa->umem->offset; 69 len = tx_sa->umem->length; 70 71 tx_sa->bytes_sent = tx_sa->bytes_acked = 0; 72 73 mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0); 74 if (!mb) { 75 return -ENOMEM; 76 } 77 sdp_dbg_data(sk, "sending SrcAvail\n"); 78 79 TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb 80 * but continue to live after mb is freed */ 81 ssk->tx_sa = tx_sa; 82 83 /* must have payload inlined in SrcAvail packet in combined mode */ 84 payload_len = MIN(tx_sa->umem->page_size - off, len); 85 payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah)); 86 payload_pg = sg_page(&chunk->page_list[0]); 87 get_page(payload_pg); 88 89 sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n", 90 off, payload_pg, payload_len); 91 92 mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags, 93 payload_pg, off, payload_len); 94 95 mb->len += payload_len; 96 mb->data_len = payload_len; 97 mb->truesize += payload_len; 98 // sk->sk_wmem_queued += payload_len; 99 // sk->sk_forward_alloc -= payload_len; 100 101 mb_entail(sk, ssk, mb); 102 103 ssk->write_seq += payload_len; 104 SDP_SKB_CB(mb)->end_seq += payload_len; 105 106 tx_sa->bytes_sent = tx_sa->umem->length; 107 tx_sa->bytes_acked = payload_len; 108 109 /* TODO: pushing the mb into the tx_queue should be enough */ 110 111 return 0; 112 } 113 114 static int sdp_post_srcavail_cancel(struct socket *sk) 115 { 116 struct sdp_sock *ssk = sdp_sk(sk); 117 struct mbuf *mb; 118 119 sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n"); 120 121 mb = sdp_alloc_mb_srcavail_cancel(sk, 0); 122 mb_entail(sk, ssk, mb); 123 124 sdp_post_sends(ssk, 0); 125 126 schedule_delayed_work(&ssk->srcavail_cancel_work, 127 SDP_SRCAVAIL_CANCEL_TIMEOUT); 128 129 return 0; 130 } 131 132 void srcavail_cancel_timeout(struct work_struct *work) 133 { 134 struct sdp_sock *ssk = 135 container_of(work, struct sdp_sock, srcavail_cancel_work.work); 136 struct socket *sk = ssk->socket; 137 138 lock_sock(sk); 139 140 sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout." 141 " closing connection\n"); 142 sdp_set_error(sk, -ECONNRESET); 143 wake_up(&ssk->wq); 144 145 release_sock(sk); 146 } 147 148 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, 149 int ignore_signals) 150 { 151 struct socket *sk = ssk->socket; 152 int err = 0; 153 long vm_wait = 0; 154 long current_timeo = *timeo_p; 155 struct tx_srcavail_state *tx_sa = ssk->tx_sa; 156 DEFINE_WAIT(wait); 157 158 sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p); 159 sdp_prf1(sk, NULL, "Going to sleep"); 160 while (ssk->qp_active) { 161 prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); 162 163 if (unlikely(!*timeo_p)) { 164 err = -ETIME; 165 tx_sa->abort_flags |= TX_SA_TIMEDOUT; 166 sdp_prf1(sk, NULL, "timeout"); 167 SDPSTATS_COUNTER_INC(zcopy_tx_timeout); 168 break; 169 } 170 171 else if (tx_sa->bytes_acked > tx_sa->bytes_sent) { 172 err = -EINVAL; 173 sdp_dbg_data(sk, "acked bytes > sent bytes\n"); 174 tx_sa->abort_flags |= TX_SA_ERROR; 175 break; 176 } 177 178 if (tx_sa->abort_flags & TX_SA_SENDSM) { 179 sdp_prf1(sk, NULL, "Aborting SrcAvail sending"); 180 SDPSTATS_COUNTER_INC(zcopy_tx_aborted); 181 err = -EAGAIN; 182 break ; 183 } 184 185 if (!ignore_signals) { 186 if (signal_pending(current)) { 187 err = -EINTR; 188 sdp_prf1(sk, NULL, "signalled"); 189 tx_sa->abort_flags |= TX_SA_INTRRUPTED; 190 break; 191 } 192 193 if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) { 194 sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n"); 195 tx_sa->abort_flags |= TX_SA_CROSS_SEND; 196 SDPSTATS_COUNTER_INC(zcopy_cross_send); 197 err = -ETIME; 198 break ; 199 } 200 } 201 202 posts_handler_put(ssk); 203 204 sk_wait_event(sk, ¤t_timeo, 205 tx_sa->abort_flags && 206 ssk->rx_sa && 207 (tx_sa->bytes_acked < tx_sa->bytes_sent) && 208 vm_wait); 209 sdp_dbg_data(ssk->socket, "woke up sleepers\n"); 210 211 posts_handler_get(ssk); 212 213 if (tx_sa->bytes_acked == tx_sa->bytes_sent) 214 break; 215 216 if (vm_wait) { 217 vm_wait -= current_timeo; 218 current_timeo = *timeo_p; 219 if (current_timeo != MAX_SCHEDULE_TIMEOUT && 220 (current_timeo -= vm_wait) < 0) 221 current_timeo = 0; 222 vm_wait = 0; 223 } 224 *timeo_p = current_timeo; 225 } 226 227 finish_wait(sk->sk_sleep, &wait); 228 229 sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n", 230 tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags); 231 232 if (!ssk->qp_active) { 233 sdp_dbg(sk, "QP destroyed while waiting\n"); 234 return -EINVAL; 235 } 236 return err; 237 } 238 239 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) 240 { 241 struct socket *sk = ssk->socket; 242 long timeo = HZ * 5; /* Timeout for for RDMA read */ 243 DEFINE_WAIT(wait); 244 245 sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n"); 246 while (1) { 247 prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE); 248 249 if (!ssk->tx_ring.rdma_inflight->busy) { 250 sdp_dbg_data(sk, "got rdma cqe\n"); 251 break; 252 } 253 254 if (!ssk->qp_active) { 255 sdp_dbg_data(sk, "QP destroyed\n"); 256 break; 257 } 258 259 if (!timeo) { 260 sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n"); 261 WARN_ON(1); 262 break; 263 } 264 265 posts_handler_put(ssk); 266 267 sdp_prf1(sk, NULL, "Going to sleep"); 268 sk_wait_event(sk, &timeo, 269 !ssk->tx_ring.rdma_inflight->busy); 270 sdp_prf1(sk, NULL, "Woke up"); 271 sdp_dbg_data(ssk->socket, "woke up sleepers\n"); 272 273 posts_handler_get(ssk); 274 } 275 276 finish_wait(sk->sk_sleep, &wait); 277 278 sdp_dbg_data(sk, "Finished waiting\n"); 279 } 280 281 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, 282 struct rx_srcavail_state *rx_sa) 283 { 284 struct mbuf *mb; 285 int copied = rx_sa->used - rx_sa->reported; 286 287 if (rx_sa->used <= rx_sa->reported) 288 return 0; 289 290 mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0); 291 292 rx_sa->reported += copied; 293 294 /* TODO: What if no tx_credits available? */ 295 sdp_post_send(ssk, mb); 296 297 return 0; 298 } 299 300 int sdp_post_sendsm(struct socket *sk) 301 { 302 struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0); 303 304 sdp_post_send(sdp_sk(sk), mb); 305 306 return 0; 307 } 308 309 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len) 310 { 311 sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len); 312 while (len > 0) { 313 if (iov->iov_len) { 314 int copy = min_t(unsigned int, iov->iov_len, len); 315 len -= copy; 316 iov->iov_len -= copy; 317 iov->iov_base += copy; 318 } 319 iov++; 320 } 321 322 return 0; 323 } 324 325 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt) 326 { 327 int bytes = 0; 328 329 while (sge_cnt > 0) { 330 bytes += sge->length; 331 sge++; 332 sge_cnt--; 333 } 334 335 return bytes; 336 } 337 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack) 338 { 339 struct socket *sk = ssk->socket; 340 unsigned long flags; 341 342 spin_lock_irqsave(&ssk->tx_sa_lock, flags); 343 344 if (!ssk->tx_sa) { 345 sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail"); 346 goto out; 347 } 348 349 if (ssk->tx_sa->mseq > mseq_ack) { 350 sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. " 351 "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n", 352 mseq_ack, ssk->tx_sa->mseq); 353 goto out; 354 } 355 356 sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n"); 357 358 ssk->tx_sa->abort_flags |= TX_SA_SENDSM; 359 cancel_delayed_work(&ssk->srcavail_cancel_work); 360 361 wake_up(sk->sk_sleep); 362 sdp_dbg_data(sk, "woke up sleepers\n"); 363 364 out: 365 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); 366 } 367 368 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, 369 u32 bytes_completed) 370 { 371 struct socket *sk = ssk->socket; 372 unsigned long flags; 373 374 sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa); 375 sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa); 376 377 spin_lock_irqsave(&ssk->tx_sa_lock, flags); 378 379 BUG_ON(!ssk); 380 381 if (!ssk->tx_sa) { 382 sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n"); 383 goto out; 384 } 385 386 if (ssk->tx_sa->mseq > mseq_ack) { 387 sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. " 388 "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n", 389 mseq_ack, ssk->tx_sa->mseq); 390 goto out; 391 } 392 393 ssk->tx_sa->bytes_acked += bytes_completed; 394 395 wake_up(sk->sk_sleep); 396 sdp_dbg_data(sk, "woke up sleepers\n"); 397 398 out: 399 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); 400 return; 401 } 402 403 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset) 404 { 405 unsigned long avail; 406 unsigned long lock_limit; 407 408 if (capable(CAP_IPC_LOCK)) 409 return ULONG_MAX; 410 411 lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur; 412 avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT); 413 414 return avail - offset; 415 } 416 417 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len, 418 struct ib_pool_fmr **_fmr, struct ib_umem **_umem) 419 { 420 struct ib_pool_fmr *fmr; 421 struct ib_umem *umem; 422 struct ib_device *dev; 423 u64 *pages; 424 struct ib_umem_chunk *chunk; 425 int n, j, k; 426 int rc = 0; 427 unsigned long max_lockable_bytes; 428 429 if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) { 430 sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n", 431 len, SDP_MAX_RDMA_READ_LEN); 432 len = SDP_MAX_RDMA_READ_LEN; 433 } 434 435 max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK); 436 if (unlikely(len > max_lockable_bytes)) { 437 sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n", 438 len, max_lockable_bytes); 439 len = max_lockable_bytes; 440 } 441 442 sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n", 443 uaddr, len, max_lockable_bytes); 444 445 umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len, 446 IB_ACCESS_REMOTE_WRITE, 0); 447 448 if (IS_ERR(umem)) { 449 rc = PTR_ERR(umem); 450 sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc); 451 sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n", 452 current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur, 453 current->signal->rlim[RLIMIT_MEMLOCK].rlim_max, 454 capable(CAP_IPC_LOCK)); 455 goto err_umem_get; 456 } 457 458 sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n", 459 umem->offset, umem->length); 460 461 pages = (u64 *) __get_free_page(GFP_KERNEL); 462 if (!pages) 463 goto err_pages_alloc; 464 465 n = 0; 466 467 dev = sdp_sk(sk)->ib_device; 468 list_for_each_entry(chunk, &umem->chunk_list, list) { 469 for (j = 0; j < chunk->nmap; ++j) { 470 len = ib_sg_dma_len(dev, 471 &chunk->page_list[j]) >> PAGE_SHIFT; 472 473 for (k = 0; k < len; ++k) { 474 pages[n++] = ib_sg_dma_address(dev, 475 &chunk->page_list[j]) + 476 umem->page_size * k; 477 478 } 479 } 480 } 481 482 fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0); 483 if (IS_ERR(fmr)) { 484 sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr)); 485 goto err_fmr_alloc; 486 } 487 488 free_page((unsigned long) pages); 489 490 *_umem = umem; 491 *_fmr = fmr; 492 493 return 0; 494 495 err_fmr_alloc: 496 free_page((unsigned long) pages); 497 498 err_pages_alloc: 499 ib_umem_release(umem); 500 501 err_umem_get: 502 503 return rc; 504 } 505 506 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem) 507 { 508 if (!sdp_sk(sk)->qp_active) 509 return; 510 511 ib_fmr_pool_unmap(*_fmr); 512 *_fmr = NULL; 513 514 ib_umem_release(*_umem); 515 *_umem = NULL; 516 } 517 518 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa) 519 { 520 struct sdp_sock *ssk = sdp_sk(sk); 521 struct ib_send_wr *bad_wr; 522 struct ib_send_wr wr = { NULL }; 523 struct ib_sge sge; 524 525 wr.opcode = IB_WR_RDMA_READ; 526 wr.next = NULL; 527 wr.wr_id = SDP_OP_RDMA; 528 wr.wr.rdma.rkey = rx_sa->rkey; 529 wr.send_flags = 0; 530 531 ssk->tx_ring.rdma_inflight = rx_sa; 532 533 sge.addr = rx_sa->umem->offset; 534 sge.length = rx_sa->umem->length; 535 sge.lkey = rx_sa->fmr->fmr->lkey; 536 537 wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used; 538 wr.num_sge = 1; 539 wr.sg_list = &sge; 540 rx_sa->busy++; 541 542 wr.send_flags = IB_SEND_SIGNALED; 543 544 return ib_post_send(ssk->qp, &wr, &bad_wr); 545 } 546 547 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb, 548 unsigned long *used) 549 { 550 struct sdp_sock *ssk = sdp_sk(sk); 551 struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb); 552 int got_srcavail_cancel; 553 int rc = 0; 554 int len = *used; 555 int copied; 556 557 sdp_dbg_data(ssk->socket, "preparing RDMA read." 558 " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len); 559 560 sock_hold(sk, SOCK_REF_RDMA_RD); 561 562 if (len > rx_sa->len) { 563 sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len); 564 WARN_ON(1); 565 len = rx_sa->len; 566 } 567 568 rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem); 569 if (rc) { 570 sdp_warn(sk, "Error allocating fmr: %d\n", rc); 571 goto err_alloc_fmr; 572 } 573 574 rc = sdp_post_rdma_read(sk, rx_sa); 575 if (unlikely(rc)) { 576 sdp_warn(sk, "ib_post_send failed with status %d.\n", rc); 577 sdp_set_error(ssk->socket, -ECONNRESET); 578 wake_up(&ssk->wq); 579 goto err_post_send; 580 } 581 582 sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc); 583 584 got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq; 585 586 sdp_arm_tx_cq(sk); 587 588 sdp_wait_rdma_wr_finished(ssk); 589 590 sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc); 591 if (!ssk->qp_active) { 592 sdp_dbg_data(sk, "QP destroyed during RDMA read\n"); 593 rc = -EPIPE; 594 goto err_post_send; 595 } 596 597 copied = rx_sa->umem->length; 598 599 sdp_update_iov_used(sk, iov, copied); 600 rx_sa->used += copied; 601 atomic_add(copied, &ssk->rcv_nxt); 602 *used = copied; 603 604 ssk->tx_ring.rdma_inflight = NULL; 605 606 err_post_send: 607 sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); 608 609 err_alloc_fmr: 610 if (rc && ssk->qp_active) { 611 sdp_warn(sk, "Couldn't do RDMA - post sendsm\n"); 612 rx_sa->flags |= RX_SA_ABORTED; 613 } 614 615 sock_put(sk, SOCK_REF_RDMA_RD); 616 617 return rc; 618 } 619 620 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p) 621 { 622 struct sdp_sock *ssk = sdp_sk(sk); 623 int ret = 0; 624 int credits_needed = 1; 625 626 sdp_dbg_data(sk, "Wait for mem\n"); 627 628 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 629 630 SDPSTATS_COUNTER_INC(send_wait_for_mem); 631 632 sdp_do_posts(ssk); 633 634 sdp_xmit_poll(ssk, 1); 635 636 ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed); 637 638 return ret; 639 } 640 641 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa, 642 struct iovec *iov, long *timeo) 643 { 644 struct sdp_sock *ssk = sdp_sk(sk); 645 int rc = 0; 646 unsigned long lock_flags; 647 648 rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len, 649 &tx_sa->fmr, &tx_sa->umem); 650 if (rc) { 651 sdp_warn(sk, "Error allocating fmr: %d\n", rc); 652 goto err_alloc_fmr; 653 } 654 655 if (tx_slots_free(ssk) == 0) { 656 rc = wait_for_sndbuf(sk, timeo); 657 if (rc) { 658 sdp_warn(sk, "Couldn't get send buffer\n"); 659 goto err_no_tx_slots; 660 } 661 } 662 663 rc = sdp_post_srcavail(sk, tx_sa); 664 if (rc) { 665 sdp_dbg(sk, "Error posting SrcAvail\n"); 666 goto err_abort_send; 667 } 668 669 rc = sdp_wait_rdmardcompl(ssk, timeo, 0); 670 if (unlikely(rc)) { 671 enum tx_sa_flag f = tx_sa->abort_flags; 672 673 if (f & TX_SA_SENDSM) { 674 sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n"); 675 } else if (f & TX_SA_ERROR) { 676 sdp_dbg_data(sk, "SrcAvail error completion\n"); 677 sdp_reset(sk); 678 SDPSTATS_COUNTER_INC(zcopy_tx_error); 679 } else if (ssk->qp_active) { 680 sdp_post_srcavail_cancel(sk); 681 682 /* Wait for RdmaRdCompl/SendSM to 683 * finish the transaction */ 684 *timeo = 2 * HZ; 685 sdp_dbg_data(sk, "Waiting for SendSM\n"); 686 sdp_wait_rdmardcompl(ssk, timeo, 1); 687 sdp_dbg_data(sk, "finished waiting\n"); 688 689 cancel_delayed_work(&ssk->srcavail_cancel_work); 690 } else { 691 sdp_dbg_data(sk, "QP was destroyed while waiting\n"); 692 } 693 } else { 694 sdp_dbg_data(sk, "got RdmaRdCompl\n"); 695 } 696 697 spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); 698 ssk->tx_sa = NULL; 699 spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags); 700 701 err_abort_send: 702 sdp_update_iov_used(sk, iov, tx_sa->bytes_acked); 703 704 err_no_tx_slots: 705 sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); 706 707 err_alloc_fmr: 708 return rc; 709 } 710 711 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov) 712 { 713 struct sdp_sock *ssk = sdp_sk(sk); 714 int rc = 0; 715 long timeo; 716 struct tx_srcavail_state *tx_sa; 717 int offset; 718 size_t bytes_to_copy = 0; 719 int copied = 0; 720 721 sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n", 722 iov->iov_base, iov->iov_len); 723 sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start"); 724 if (ssk->rx_sa) { 725 sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n"); 726 return 0; 727 } 728 729 sock_hold(ssk->socket, SOCK_REF_ZCOPY); 730 731 SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment); 732 733 timeo = SDP_SRCAVAIL_ADV_TIMEOUT ; 734 735 /* Ok commence sending. */ 736 offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1); 737 738 tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL); 739 if (!tx_sa) { 740 sdp_warn(sk, "Error allocating zcopy context\n"); 741 rc = -EAGAIN; /* Buffer too big - fallback to bcopy */ 742 goto err_alloc_tx_sa; 743 } 744 745 bytes_to_copy = iov->iov_len; 746 do { 747 tx_sa_reset(tx_sa); 748 749 rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo); 750 751 if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) { 752 sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n", 753 iov->iov_len); 754 break; 755 } 756 } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags); 757 758 kfree(tx_sa); 759 err_alloc_tx_sa: 760 copied = bytes_to_copy - iov->iov_len; 761 762 sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied); 763 764 sock_put(ssk->socket, SOCK_REF_ZCOPY); 765 766 if (rc < 0 && rc != -EAGAIN && rc != -ETIME) 767 return rc; 768 769 return copied; 770 } 771 772 void sdp_abort_srcavail(struct socket *sk) 773 { 774 struct sdp_sock *ssk = sdp_sk(sk); 775 struct tx_srcavail_state *tx_sa = ssk->tx_sa; 776 unsigned long flags; 777 778 if (!tx_sa) 779 return; 780 781 cancel_delayed_work(&ssk->srcavail_cancel_work); 782 flush_scheduled_work(); 783 784 spin_lock_irqsave(&ssk->tx_sa_lock, flags); 785 786 sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); 787 788 ssk->tx_sa = NULL; 789 790 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); 791 } 792 793 void sdp_abort_rdma_read(struct socket *sk) 794 { 795 struct sdp_sock *ssk = sdp_sk(sk); 796 struct rx_srcavail_state *rx_sa = ssk->rx_sa; 797 798 if (!rx_sa) 799 return; 800 801 sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); 802 803 ssk->rx_sa = NULL; 804 } 805