1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0 3 * 4 * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the 10 * OpenIB.org BSD license below: 11 * 12 * Redistribution and use in source and binary forms, with or 13 * without modification, are permitted provided that the following 14 * conditions are met: 15 * 16 * - Redistributions of source code must retain the above 17 * copyright notice, this list of conditions and the following 18 * disclaimer. 19 * 20 * - Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials 23 * provided with the distribution. 24 * 25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32 * SOFTWARE. 33 */ 34 #include <linux/tcp.h> 35 #include <asm/ioctls.h> 36 #include <linux/workqueue.h> 37 #include <linux/net.h> 38 #include <linux/socket.h> 39 #include <net/protocol.h> 40 #include <net/inet_common.h> 41 #include <rdma/rdma_cm.h> 42 #include <rdma/ib_verbs.h> 43 #include <rdma/ib_fmr_pool.h> 44 #include <rdma/ib_umem.h> 45 #include <net/tcp.h> /* for memcpy_toiovec */ 46 #include <asm/io.h> 47 #include <asm/uaccess.h> 48 #include <linux/delay.h> 49 #include "sdp.h" 50 51 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa) 52 { 53 struct sdp_sock *ssk = sdp_sk(sk); 54 struct mbuf *mb; 55 int payload_len; 56 struct page *payload_pg; 57 int off, len; 58 struct ib_umem_chunk *chunk; 59 60 WARN_ON(ssk->tx_sa); 61 62 BUG_ON(!tx_sa); 63 BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey); 64 BUG_ON(!tx_sa->umem); 65 BUG_ON(!tx_sa->umem->chunk_list.next); 66 67 chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list); 68 BUG_ON(!chunk->nmap); 69 70 off = tx_sa->umem->offset; 71 len = tx_sa->umem->length; 72 73 tx_sa->bytes_sent = tx_sa->bytes_acked = 0; 74 75 mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0); 76 if (!mb) { 77 return -ENOMEM; 78 } 79 sdp_dbg_data(sk, "sending SrcAvail\n"); 80 81 TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb 82 * but continue to live after mb is freed */ 83 ssk->tx_sa = tx_sa; 84 85 /* must have payload inlined in SrcAvail packet in combined mode */ 86 payload_len = MIN(tx_sa->umem->page_size - off, len); 87 payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah)); 88 payload_pg = sg_page(&chunk->page_list[0]); 89 get_page(payload_pg); 90 91 sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n", 92 off, payload_pg, payload_len); 93 94 mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags, 95 payload_pg, off, payload_len); 96 97 mb->len += payload_len; 98 mb->data_len = payload_len; 99 mb->truesize += payload_len; 100 // sk->sk_wmem_queued += payload_len; 101 // sk->sk_forward_alloc -= payload_len; 102 103 mb_entail(sk, ssk, mb); 104 105 ssk->write_seq += payload_len; 106 SDP_SKB_CB(mb)->end_seq += payload_len; 107 108 tx_sa->bytes_sent = tx_sa->umem->length; 109 tx_sa->bytes_acked = payload_len; 110 111 /* TODO: pushing the mb into the tx_queue should be enough */ 112 113 return 0; 114 } 115 116 static int sdp_post_srcavail_cancel(struct socket *sk) 117 { 118 struct sdp_sock *ssk = sdp_sk(sk); 119 struct mbuf *mb; 120 121 sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n"); 122 123 mb = sdp_alloc_mb_srcavail_cancel(sk, 0); 124 mb_entail(sk, ssk, mb); 125 126 sdp_post_sends(ssk, 0); 127 128 schedule_delayed_work(&ssk->srcavail_cancel_work, 129 SDP_SRCAVAIL_CANCEL_TIMEOUT); 130 131 return 0; 132 } 133 134 void srcavail_cancel_timeout(struct work_struct *work) 135 { 136 struct sdp_sock *ssk = 137 container_of(work, struct sdp_sock, srcavail_cancel_work.work); 138 struct socket *sk = ssk->socket; 139 140 lock_sock(sk); 141 142 sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout." 143 " closing connection\n"); 144 sdp_set_error(sk, -ECONNRESET); 145 wake_up(&ssk->wq); 146 147 release_sock(sk); 148 } 149 150 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, 151 int ignore_signals) 152 { 153 struct socket *sk = ssk->socket; 154 int err = 0; 155 long vm_wait = 0; 156 long current_timeo = *timeo_p; 157 struct tx_srcavail_state *tx_sa = ssk->tx_sa; 158 DEFINE_WAIT(wait); 159 160 sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p); 161 sdp_prf1(sk, NULL, "Going to sleep"); 162 while (ssk->qp_active) { 163 prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); 164 165 if (unlikely(!*timeo_p)) { 166 err = -ETIME; 167 tx_sa->abort_flags |= TX_SA_TIMEDOUT; 168 sdp_prf1(sk, NULL, "timeout"); 169 SDPSTATS_COUNTER_INC(zcopy_tx_timeout); 170 break; 171 } 172 173 else if (tx_sa->bytes_acked > tx_sa->bytes_sent) { 174 err = -EINVAL; 175 sdp_dbg_data(sk, "acked bytes > sent bytes\n"); 176 tx_sa->abort_flags |= TX_SA_ERROR; 177 break; 178 } 179 180 if (tx_sa->abort_flags & TX_SA_SENDSM) { 181 sdp_prf1(sk, NULL, "Aborting SrcAvail sending"); 182 SDPSTATS_COUNTER_INC(zcopy_tx_aborted); 183 err = -EAGAIN; 184 break ; 185 } 186 187 if (!ignore_signals) { 188 if (signal_pending(current)) { 189 err = -EINTR; 190 sdp_prf1(sk, NULL, "signalled"); 191 tx_sa->abort_flags |= TX_SA_INTRRUPTED; 192 break; 193 } 194 195 if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) { 196 sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n"); 197 tx_sa->abort_flags |= TX_SA_CROSS_SEND; 198 SDPSTATS_COUNTER_INC(zcopy_cross_send); 199 err = -ETIME; 200 break ; 201 } 202 } 203 204 posts_handler_put(ssk); 205 206 sk_wait_event(sk, ¤t_timeo, 207 tx_sa->abort_flags && 208 ssk->rx_sa && 209 (tx_sa->bytes_acked < tx_sa->bytes_sent) && 210 vm_wait); 211 sdp_dbg_data(ssk->socket, "woke up sleepers\n"); 212 213 posts_handler_get(ssk); 214 215 if (tx_sa->bytes_acked == tx_sa->bytes_sent) 216 break; 217 218 if (vm_wait) { 219 vm_wait -= current_timeo; 220 current_timeo = *timeo_p; 221 if (current_timeo != MAX_SCHEDULE_TIMEOUT && 222 (current_timeo -= vm_wait) < 0) 223 current_timeo = 0; 224 vm_wait = 0; 225 } 226 *timeo_p = current_timeo; 227 } 228 229 finish_wait(sk->sk_sleep, &wait); 230 231 sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n", 232 tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags); 233 234 if (!ssk->qp_active) { 235 sdp_dbg(sk, "QP destroyed while waiting\n"); 236 return -EINVAL; 237 } 238 return err; 239 } 240 241 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) 242 { 243 struct socket *sk = ssk->socket; 244 long timeo = HZ * 5; /* Timeout for for RDMA read */ 245 DEFINE_WAIT(wait); 246 247 sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n"); 248 while (1) { 249 prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE); 250 251 if (!ssk->tx_ring.rdma_inflight->busy) { 252 sdp_dbg_data(sk, "got rdma cqe\n"); 253 break; 254 } 255 256 if (!ssk->qp_active) { 257 sdp_dbg_data(sk, "QP destroyed\n"); 258 break; 259 } 260 261 if (!timeo) { 262 sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n"); 263 WARN_ON(1); 264 break; 265 } 266 267 posts_handler_put(ssk); 268 269 sdp_prf1(sk, NULL, "Going to sleep"); 270 sk_wait_event(sk, &timeo, 271 !ssk->tx_ring.rdma_inflight->busy); 272 sdp_prf1(sk, NULL, "Woke up"); 273 sdp_dbg_data(ssk->socket, "woke up sleepers\n"); 274 275 posts_handler_get(ssk); 276 } 277 278 finish_wait(sk->sk_sleep, &wait); 279 280 sdp_dbg_data(sk, "Finished waiting\n"); 281 } 282 283 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, 284 struct rx_srcavail_state *rx_sa) 285 { 286 struct mbuf *mb; 287 int copied = rx_sa->used - rx_sa->reported; 288 289 if (rx_sa->used <= rx_sa->reported) 290 return 0; 291 292 mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0); 293 294 rx_sa->reported += copied; 295 296 /* TODO: What if no tx_credits available? */ 297 sdp_post_send(ssk, mb); 298 299 return 0; 300 } 301 302 int sdp_post_sendsm(struct socket *sk) 303 { 304 struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0); 305 306 sdp_post_send(sdp_sk(sk), mb); 307 308 return 0; 309 } 310 311 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len) 312 { 313 sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len); 314 while (len > 0) { 315 if (iov->iov_len) { 316 int copy = min_t(unsigned int, iov->iov_len, len); 317 len -= copy; 318 iov->iov_len -= copy; 319 iov->iov_base += copy; 320 } 321 iov++; 322 } 323 324 return 0; 325 } 326 327 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt) 328 { 329 int bytes = 0; 330 331 while (sge_cnt > 0) { 332 bytes += sge->length; 333 sge++; 334 sge_cnt--; 335 } 336 337 return bytes; 338 } 339 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack) 340 { 341 struct socket *sk = ssk->socket; 342 unsigned long flags; 343 344 spin_lock_irqsave(&ssk->tx_sa_lock, flags); 345 346 if (!ssk->tx_sa) { 347 sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail"); 348 goto out; 349 } 350 351 if (ssk->tx_sa->mseq > mseq_ack) { 352 sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. " 353 "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n", 354 mseq_ack, ssk->tx_sa->mseq); 355 goto out; 356 } 357 358 sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n"); 359 360 ssk->tx_sa->abort_flags |= TX_SA_SENDSM; 361 cancel_delayed_work(&ssk->srcavail_cancel_work); 362 363 wake_up(sk->sk_sleep); 364 sdp_dbg_data(sk, "woke up sleepers\n"); 365 366 out: 367 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); 368 } 369 370 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, 371 u32 bytes_completed) 372 { 373 struct socket *sk = ssk->socket; 374 unsigned long flags; 375 376 sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa); 377 sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa); 378 379 spin_lock_irqsave(&ssk->tx_sa_lock, flags); 380 381 BUG_ON(!ssk); 382 383 if (!ssk->tx_sa) { 384 sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n"); 385 goto out; 386 } 387 388 if (ssk->tx_sa->mseq > mseq_ack) { 389 sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. " 390 "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n", 391 mseq_ack, ssk->tx_sa->mseq); 392 goto out; 393 } 394 395 ssk->tx_sa->bytes_acked += bytes_completed; 396 397 wake_up(sk->sk_sleep); 398 sdp_dbg_data(sk, "woke up sleepers\n"); 399 400 out: 401 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); 402 return; 403 } 404 405 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset) 406 { 407 unsigned long avail; 408 unsigned long lock_limit; 409 410 if (capable(CAP_IPC_LOCK)) 411 return ULONG_MAX; 412 413 lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur; 414 avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT); 415 416 return avail - offset; 417 } 418 419 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len, 420 struct ib_pool_fmr **_fmr, struct ib_umem **_umem) 421 { 422 struct ib_pool_fmr *fmr; 423 struct ib_umem *umem; 424 struct ib_device *dev; 425 u64 *pages; 426 struct ib_umem_chunk *chunk; 427 int n, j, k; 428 int rc = 0; 429 unsigned long max_lockable_bytes; 430 431 if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) { 432 sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n", 433 len, SDP_MAX_RDMA_READ_LEN); 434 len = SDP_MAX_RDMA_READ_LEN; 435 } 436 437 max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK); 438 if (unlikely(len > max_lockable_bytes)) { 439 sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n", 440 len, max_lockable_bytes); 441 len = max_lockable_bytes; 442 } 443 444 sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n", 445 uaddr, len, max_lockable_bytes); 446 447 umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len, 448 IB_ACCESS_REMOTE_WRITE, 0); 449 450 if (IS_ERR(umem)) { 451 rc = PTR_ERR(umem); 452 sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc); 453 sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n", 454 current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur, 455 current->signal->rlim[RLIMIT_MEMLOCK].rlim_max, 456 capable(CAP_IPC_LOCK)); 457 goto err_umem_get; 458 } 459 460 sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n", 461 umem->offset, umem->length); 462 463 pages = (u64 *) __get_free_page(GFP_KERNEL); 464 if (!pages) 465 goto err_pages_alloc; 466 467 n = 0; 468 469 dev = sdp_sk(sk)->ib_device; 470 list_for_each_entry(chunk, &umem->chunk_list, list) { 471 for (j = 0; j < chunk->nmap; ++j) { 472 len = ib_sg_dma_len(dev, 473 &chunk->page_list[j]) >> PAGE_SHIFT; 474 475 for (k = 0; k < len; ++k) { 476 pages[n++] = ib_sg_dma_address(dev, 477 &chunk->page_list[j]) + 478 umem->page_size * k; 479 480 } 481 } 482 } 483 484 fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0); 485 if (IS_ERR(fmr)) { 486 sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr)); 487 goto err_fmr_alloc; 488 } 489 490 free_page((unsigned long) pages); 491 492 *_umem = umem; 493 *_fmr = fmr; 494 495 return 0; 496 497 err_fmr_alloc: 498 free_page((unsigned long) pages); 499 500 err_pages_alloc: 501 ib_umem_release(umem); 502 503 err_umem_get: 504 505 return rc; 506 } 507 508 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem) 509 { 510 if (!sdp_sk(sk)->qp_active) 511 return; 512 513 ib_fmr_pool_unmap(*_fmr); 514 *_fmr = NULL; 515 516 ib_umem_release(*_umem); 517 *_umem = NULL; 518 } 519 520 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa) 521 { 522 struct sdp_sock *ssk = sdp_sk(sk); 523 struct ib_send_wr *bad_wr; 524 struct ib_send_wr wr = { NULL }; 525 struct ib_sge sge; 526 527 wr.opcode = IB_WR_RDMA_READ; 528 wr.next = NULL; 529 wr.wr_id = SDP_OP_RDMA; 530 wr.wr.rdma.rkey = rx_sa->rkey; 531 wr.send_flags = 0; 532 533 ssk->tx_ring.rdma_inflight = rx_sa; 534 535 sge.addr = rx_sa->umem->offset; 536 sge.length = rx_sa->umem->length; 537 sge.lkey = rx_sa->fmr->fmr->lkey; 538 539 wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used; 540 wr.num_sge = 1; 541 wr.sg_list = &sge; 542 rx_sa->busy++; 543 544 wr.send_flags = IB_SEND_SIGNALED; 545 546 return ib_post_send(ssk->qp, &wr, &bad_wr); 547 } 548 549 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb, 550 unsigned long *used) 551 { 552 struct sdp_sock *ssk = sdp_sk(sk); 553 struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb); 554 int got_srcavail_cancel; 555 int rc = 0; 556 int len = *used; 557 int copied; 558 559 sdp_dbg_data(ssk->socket, "preparing RDMA read." 560 " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len); 561 562 sock_hold(sk, SOCK_REF_RDMA_RD); 563 564 if (len > rx_sa->len) { 565 sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len); 566 WARN_ON(1); 567 len = rx_sa->len; 568 } 569 570 rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem); 571 if (rc) { 572 sdp_warn(sk, "Error allocating fmr: %d\n", rc); 573 goto err_alloc_fmr; 574 } 575 576 rc = sdp_post_rdma_read(sk, rx_sa); 577 if (unlikely(rc)) { 578 sdp_warn(sk, "ib_post_send failed with status %d.\n", rc); 579 sdp_set_error(ssk->socket, -ECONNRESET); 580 wake_up(&ssk->wq); 581 goto err_post_send; 582 } 583 584 sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc); 585 586 got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq; 587 588 sdp_arm_tx_cq(sk); 589 590 sdp_wait_rdma_wr_finished(ssk); 591 592 sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc); 593 if (!ssk->qp_active) { 594 sdp_dbg_data(sk, "QP destroyed during RDMA read\n"); 595 rc = -EPIPE; 596 goto err_post_send; 597 } 598 599 copied = rx_sa->umem->length; 600 601 sdp_update_iov_used(sk, iov, copied); 602 rx_sa->used += copied; 603 atomic_add(copied, &ssk->rcv_nxt); 604 *used = copied; 605 606 ssk->tx_ring.rdma_inflight = NULL; 607 608 err_post_send: 609 sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); 610 611 err_alloc_fmr: 612 if (rc && ssk->qp_active) { 613 sdp_warn(sk, "Couldn't do RDMA - post sendsm\n"); 614 rx_sa->flags |= RX_SA_ABORTED; 615 } 616 617 sock_put(sk, SOCK_REF_RDMA_RD); 618 619 return rc; 620 } 621 622 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p) 623 { 624 struct sdp_sock *ssk = sdp_sk(sk); 625 int ret = 0; 626 int credits_needed = 1; 627 628 sdp_dbg_data(sk, "Wait for mem\n"); 629 630 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 631 632 SDPSTATS_COUNTER_INC(send_wait_for_mem); 633 634 sdp_do_posts(ssk); 635 636 sdp_xmit_poll(ssk, 1); 637 638 ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed); 639 640 return ret; 641 } 642 643 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa, 644 struct iovec *iov, long *timeo) 645 { 646 struct sdp_sock *ssk = sdp_sk(sk); 647 int rc = 0; 648 unsigned long lock_flags; 649 650 rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len, 651 &tx_sa->fmr, &tx_sa->umem); 652 if (rc) { 653 sdp_warn(sk, "Error allocating fmr: %d\n", rc); 654 goto err_alloc_fmr; 655 } 656 657 if (tx_slots_free(ssk) == 0) { 658 rc = wait_for_sndbuf(sk, timeo); 659 if (rc) { 660 sdp_warn(sk, "Couldn't get send buffer\n"); 661 goto err_no_tx_slots; 662 } 663 } 664 665 rc = sdp_post_srcavail(sk, tx_sa); 666 if (rc) { 667 sdp_dbg(sk, "Error posting SrcAvail\n"); 668 goto err_abort_send; 669 } 670 671 rc = sdp_wait_rdmardcompl(ssk, timeo, 0); 672 if (unlikely(rc)) { 673 enum tx_sa_flag f = tx_sa->abort_flags; 674 675 if (f & TX_SA_SENDSM) { 676 sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n"); 677 } else if (f & TX_SA_ERROR) { 678 sdp_dbg_data(sk, "SrcAvail error completion\n"); 679 sdp_reset(sk); 680 SDPSTATS_COUNTER_INC(zcopy_tx_error); 681 } else if (ssk->qp_active) { 682 sdp_post_srcavail_cancel(sk); 683 684 /* Wait for RdmaRdCompl/SendSM to 685 * finish the transaction */ 686 *timeo = 2 * HZ; 687 sdp_dbg_data(sk, "Waiting for SendSM\n"); 688 sdp_wait_rdmardcompl(ssk, timeo, 1); 689 sdp_dbg_data(sk, "finished waiting\n"); 690 691 cancel_delayed_work(&ssk->srcavail_cancel_work); 692 } else { 693 sdp_dbg_data(sk, "QP was destroyed while waiting\n"); 694 } 695 } else { 696 sdp_dbg_data(sk, "got RdmaRdCompl\n"); 697 } 698 699 spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); 700 ssk->tx_sa = NULL; 701 spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags); 702 703 err_abort_send: 704 sdp_update_iov_used(sk, iov, tx_sa->bytes_acked); 705 706 err_no_tx_slots: 707 sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); 708 709 err_alloc_fmr: 710 return rc; 711 } 712 713 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov) 714 { 715 struct sdp_sock *ssk = sdp_sk(sk); 716 int rc = 0; 717 long timeo; 718 struct tx_srcavail_state *tx_sa; 719 int offset; 720 size_t bytes_to_copy = 0; 721 int copied = 0; 722 723 sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n", 724 iov->iov_base, iov->iov_len); 725 sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start"); 726 if (ssk->rx_sa) { 727 sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n"); 728 return 0; 729 } 730 731 sock_hold(ssk->socket, SOCK_REF_ZCOPY); 732 733 SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment); 734 735 timeo = SDP_SRCAVAIL_ADV_TIMEOUT ; 736 737 /* Ok commence sending. */ 738 offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1); 739 740 tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL); 741 if (!tx_sa) { 742 sdp_warn(sk, "Error allocating zcopy context\n"); 743 rc = -EAGAIN; /* Buffer too big - fallback to bcopy */ 744 goto err_alloc_tx_sa; 745 } 746 747 bytes_to_copy = iov->iov_len; 748 do { 749 tx_sa_reset(tx_sa); 750 751 rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo); 752 753 if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) { 754 sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n", 755 iov->iov_len); 756 break; 757 } 758 } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags); 759 760 kfree(tx_sa); 761 err_alloc_tx_sa: 762 copied = bytes_to_copy - iov->iov_len; 763 764 sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied); 765 766 sock_put(ssk->socket, SOCK_REF_ZCOPY); 767 768 if (rc < 0 && rc != -EAGAIN && rc != -ETIME) 769 return rc; 770 771 return copied; 772 } 773 774 void sdp_abort_srcavail(struct socket *sk) 775 { 776 struct sdp_sock *ssk = sdp_sk(sk); 777 struct tx_srcavail_state *tx_sa = ssk->tx_sa; 778 unsigned long flags; 779 780 if (!tx_sa) 781 return; 782 783 cancel_delayed_work(&ssk->srcavail_cancel_work); 784 flush_scheduled_work(); 785 786 spin_lock_irqsave(&ssk->tx_sa_lock, flags); 787 788 sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); 789 790 ssk->tx_sa = NULL; 791 792 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); 793 } 794 795 void sdp_abort_rdma_read(struct socket *sk) 796 { 797 struct sdp_sock *ssk = sdp_sk(sk); 798 struct rx_srcavail_state *rx_sa = ssk->rx_sa; 799 800 if (!rx_sa) 801 return; 802 803 sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); 804 805 ssk->rx_sa = NULL; 806 } 807