1 /* 2 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 */ 32 #include "sdp.h" 33 34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024, 35 "Receive buffer initial size in bytes."); 36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8, 37 "Receive buffer size scale factor."); 38 39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ 40 static void 41 sdp_handle_disconn(struct sdp_sock *ssk) 42 { 43 44 sdp_dbg(ssk->socket, "%s\n", __func__); 45 46 SDP_WLOCK_ASSERT(ssk); 47 if (TCPS_HAVERCVDFIN(ssk->state) == 0) 48 socantrcvmore(ssk->socket); 49 50 switch (ssk->state) { 51 case TCPS_SYN_RECEIVED: 52 case TCPS_ESTABLISHED: 53 ssk->state = TCPS_CLOSE_WAIT; 54 break; 55 56 case TCPS_FIN_WAIT_1: 57 /* Received a reply FIN - start Infiniband tear down */ 58 sdp_dbg(ssk->socket, 59 "%s: Starting Infiniband tear down sending DREQ\n", 60 __func__); 61 62 sdp_cancel_dreq_wait_timeout(ssk); 63 ssk->qp_active = 0; 64 if (ssk->id) { 65 struct rdma_cm_id *id; 66 67 id = ssk->id; 68 SDP_WUNLOCK(ssk); 69 rdma_disconnect(id); 70 SDP_WLOCK(ssk); 71 } else { 72 sdp_warn(ssk->socket, 73 "%s: ssk->id is NULL\n", __func__); 74 return; 75 } 76 break; 77 case TCPS_TIME_WAIT: 78 /* This is a mutual close situation and we've got the DREQ from 79 the peer before the SDP_MID_DISCONNECT */ 80 break; 81 case TCPS_CLOSED: 82 /* FIN arrived after IB teardown started - do nothing */ 83 sdp_dbg(ssk->socket, "%s: fin in state %s\n", 84 __func__, sdp_state_str(ssk->state)); 85 return; 86 default: 87 sdp_warn(ssk->socket, 88 "%s: FIN in unexpected state. state=%d\n", 89 __func__, ssk->state); 90 break; 91 } 92 } 93 94 static int 95 sdp_post_recv(struct sdp_sock *ssk) 96 { 97 struct sdp_buf *rx_req; 98 int i, rc; 99 u64 addr; 100 struct ib_device *dev; 101 struct ib_recv_wr rx_wr = { NULL }; 102 struct ib_sge ibsge[SDP_MAX_RECV_SGES]; 103 struct ib_sge *sge = ibsge; 104 struct ib_recv_wr *bad_wr; 105 struct mbuf *mb, *m; 106 struct sdp_bsdh *h; 107 int id = ring_head(ssk->rx_ring); 108 109 /* Now, allocate and repost recv */ 110 sdp_prf(ssk->socket, mb, "Posting mb"); 111 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR); 112 if (mb == NULL) { 113 /* Retry so we can't stall out with no memory. */ 114 if (!rx_ring_posted(ssk)) 115 queue_work(rx_comp_wq, &ssk->rx_comp_work); 116 return -1; 117 } 118 for (m = mb; m != NULL; m = m->m_next) { 119 m->m_len = (m->m_flags & M_EXT) ? m->m_ext.ext_size : 120 ((m->m_flags & M_PKTHDR) ? MHLEN : MLEN); 121 mb->m_pkthdr.len += m->m_len; 122 } 123 h = mtod(mb, struct sdp_bsdh *); 124 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1)); 125 rx_req->mb = mb; 126 dev = ssk->ib_device; 127 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) { 128 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len, 129 DMA_TO_DEVICE); 130 /* TODO: proper error handling */ 131 BUG_ON(ib_dma_mapping_error(dev, addr)); 132 BUG_ON(i >= SDP_MAX_RECV_SGES); 133 rx_req->mapping[i] = addr; 134 sge->addr = addr; 135 sge->length = mb->m_len; 136 sge->lkey = ssk->sdp_dev->mr->lkey; 137 } 138 139 rx_wr.next = NULL; 140 rx_wr.wr_id = id | SDP_OP_RECV; 141 rx_wr.sg_list = ibsge; 142 rx_wr.num_sge = i; 143 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr); 144 if (unlikely(rc)) { 145 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc); 146 147 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE); 148 m_freem(mb); 149 150 sdp_notify(ssk, ECONNRESET); 151 152 return -1; 153 } 154 155 atomic_inc(&ssk->rx_ring.head); 156 SDPSTATS_COUNTER_INC(post_recv); 157 158 return 0; 159 } 160 161 static inline int 162 sdp_post_recvs_needed(struct sdp_sock *ssk) 163 { 164 unsigned long bytes_in_process; 165 unsigned long max_bytes; 166 int buffer_size; 167 int posted; 168 169 if (!ssk->qp_active || !ssk->socket) 170 return 0; 171 172 posted = rx_ring_posted(ssk); 173 if (posted >= SDP_RX_SIZE) 174 return 0; 175 if (posted < SDP_MIN_TX_CREDITS) 176 return 1; 177 178 buffer_size = ssk->recv_bytes; 179 max_bytes = max(ssk->socket->so_snd.sb_hiwat, 180 (1 + SDP_MIN_TX_CREDITS) * buffer_size); 181 max_bytes *= rcvbuf_scale; 182 /* 183 * Compute bytes in the receive queue and socket buffer. 184 */ 185 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size; 186 bytes_in_process += ssk->socket->so_rcv.sb_cc; 187 188 return bytes_in_process < max_bytes; 189 } 190 191 static inline void 192 sdp_post_recvs(struct sdp_sock *ssk) 193 { 194 195 while (sdp_post_recvs_needed(ssk)) 196 if (sdp_post_recv(ssk)) 197 return; 198 } 199 200 static inline struct mbuf * 201 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb) 202 { 203 struct sdp_sock *ssk = sdp_sk(sk); 204 struct sdp_bsdh *h; 205 206 h = mtod(mb, struct sdp_bsdh *); 207 208 #ifdef SDP_ZCOPY 209 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk); 210 if (h->mid == SDP_MID_SRCAVAIL) { 211 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1); 212 struct rx_srcavail_state *rx_sa; 213 214 ssk->srcavail_cancel_mseq = 0; 215 216 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc( 217 sizeof(struct rx_srcavail_state), M_NOWAIT); 218 219 rx_sa->mseq = ntohl(h->mseq); 220 rx_sa->used = 0; 221 rx_sa->len = mb_len = ntohl(srcah->len); 222 rx_sa->rkey = ntohl(srcah->rkey); 223 rx_sa->vaddr = be64_to_cpu(srcah->vaddr); 224 rx_sa->flags = 0; 225 226 if (ssk->tx_sa) { 227 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting " 228 "for TX SrcAvail. waking up TX SrcAvail" 229 "to be aborted\n"); 230 wake_up(sk->sk_sleep); 231 } 232 233 atomic_add(mb->len, &ssk->rcv_nxt); 234 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n", 235 mb_len, rx_sa->vaddr); 236 } else 237 #endif 238 { 239 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt); 240 } 241 242 m_adj(mb, SDP_HEAD_SIZE); 243 SOCKBUF_LOCK(&sk->so_rcv); 244 if (unlikely(h->flags & SDP_OOB_PRES)) 245 sdp_urg(ssk, mb); 246 sbappend_locked(&sk->so_rcv, mb); 247 sorwakeup_locked(sk); 248 return mb; 249 } 250 251 static int 252 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size) 253 { 254 255 return MIN(new_size, SDP_MAX_PACKET); 256 } 257 258 int 259 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) 260 { 261 262 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size); 263 sdp_post_recvs(ssk); 264 265 return 0; 266 } 267 268 int 269 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) 270 { 271 u32 curr_size = ssk->recv_bytes; 272 u32 max_size = SDP_MAX_PACKET; 273 274 if (new_size > curr_size && new_size <= max_size) { 275 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size); 276 return 0; 277 } 278 return -1; 279 } 280 281 static void 282 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) 283 { 284 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0) 285 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1; 286 else 287 ssk->recv_request_head = ring_tail(ssk->rx_ring); 288 ssk->recv_request = 1; 289 } 290 291 static void 292 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) 293 { 294 u32 new_size = ntohl(buf->size); 295 296 if (new_size > ssk->xmit_size_goal) 297 ssk->xmit_size_goal = new_size; 298 } 299 300 static struct mbuf * 301 sdp_recv_completion(struct sdp_sock *ssk, int id) 302 { 303 struct sdp_buf *rx_req; 304 struct ib_device *dev; 305 struct mbuf *mb; 306 307 if (unlikely(id != ring_tail(ssk->rx_ring))) { 308 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", 309 id, ring_tail(ssk->rx_ring)); 310 return NULL; 311 } 312 313 dev = ssk->ib_device; 314 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)]; 315 mb = rx_req->mb; 316 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE); 317 318 atomic_inc(&ssk->rx_ring.tail); 319 atomic_dec(&ssk->remote_credits); 320 return mb; 321 } 322 323 /* socket lock should be taken before calling this */ 324 static int 325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb) 326 { 327 struct sdp_bsdh *h; 328 struct socket *sk; 329 330 SDP_WLOCK_ASSERT(ssk); 331 sk = ssk->socket; 332 h = mtod(mb, struct sdp_bsdh *); 333 switch (h->mid) { 334 case SDP_MID_DATA: 335 case SDP_MID_SRCAVAIL: 336 sdp_dbg(sk, "DATA after socket rcv was shutdown\n"); 337 338 /* got data in RCV_SHUTDOWN */ 339 if (ssk->state == TCPS_FIN_WAIT_1) { 340 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n"); 341 sdp_notify(ssk, ECONNRESET); 342 } 343 m_freem(mb); 344 345 break; 346 #ifdef SDP_ZCOPY 347 case SDP_MID_RDMARDCOMPL: 348 m_freem(mb); 349 break; 350 case SDP_MID_SENDSM: 351 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack)); 352 m_freem(mb); 353 break; 354 case SDP_MID_SRCAVAIL_CANCEL: 355 sdp_dbg_data(sk, "Handling SrcAvailCancel\n"); 356 sdp_prf(sk, NULL, "Handling SrcAvailCancel"); 357 if (ssk->rx_sa) { 358 ssk->srcavail_cancel_mseq = ntohl(h->mseq); 359 ssk->rx_sa->flags |= RX_SA_ABORTED; 360 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 361 the dirty logic from recvmsg */ 362 } else { 363 sdp_dbg(sk, "Got SrcAvailCancel - " 364 "but no SrcAvail in process\n"); 365 } 366 m_freem(mb); 367 break; 368 case SDP_MID_SINKAVAIL: 369 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n"); 370 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored"); 371 /* FALLTHROUGH */ 372 #endif 373 case SDP_MID_ABORT: 374 sdp_dbg_data(sk, "Handling ABORT\n"); 375 sdp_prf(sk, NULL, "Handling ABORT"); 376 sdp_notify(ssk, ECONNRESET); 377 m_freem(mb); 378 break; 379 case SDP_MID_DISCONN: 380 sdp_dbg_data(sk, "Handling DISCONN\n"); 381 sdp_prf(sk, NULL, "Handling DISCONN"); 382 sdp_handle_disconn(ssk); 383 break; 384 case SDP_MID_CHRCVBUF: 385 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n"); 386 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1)); 387 m_freem(mb); 388 break; 389 case SDP_MID_CHRCVBUF_ACK: 390 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n"); 391 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1)); 392 m_freem(mb); 393 break; 394 default: 395 /* TODO: Handle other messages */ 396 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid); 397 m_freem(mb); 398 } 399 400 return 0; 401 } 402 403 static int 404 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb) 405 { 406 struct socket *sk; 407 struct sdp_bsdh *h; 408 unsigned long mseq_ack; 409 int credits_before; 410 411 h = mtod(mb, struct sdp_bsdh *); 412 sk = ssk->socket; 413 /* 414 * If another thread is in so_pcbfree this may be partially torn 415 * down but no further synchronization is required as the destroying 416 * thread will wait for receive to shutdown before discarding the 417 * socket. 418 */ 419 if (sk == NULL) { 420 m_freem(mb); 421 return 0; 422 } 423 424 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); 425 426 mseq_ack = ntohl(h->mseq_ack); 427 credits_before = tx_credits(ssk); 428 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 429 1 + ntohs(h->bufs)); 430 if (mseq_ack >= ssk->nagle_last_unacked) 431 ssk->nagle_last_unacked = 0; 432 433 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n", 434 mid2str(h->mid), ntohs(h->bufs), credits_before, 435 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); 436 437 if (unlikely(h->mid == SDP_MID_DATA && 438 mb->m_pkthdr.len == SDP_HEAD_SIZE)) { 439 /* Credit update is valid even after RCV_SHUTDOWN */ 440 m_freem(mb); 441 return 0; 442 } 443 444 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) || 445 TCPS_HAVERCVDFIN(ssk->state)) { 446 sdp_prf(sk, NULL, "Control mb - queing to control queue"); 447 #ifdef SDP_ZCOPY 448 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) { 449 sdp_dbg_data(sk, "Got SrcAvailCancel. " 450 "seq: 0x%d seq_ack: 0x%d\n", 451 ntohl(h->mseq), ntohl(h->mseq_ack)); 452 ssk->srcavail_cancel_mseq = ntohl(h->mseq); 453 } 454 455 456 if (h->mid == SDP_MID_RDMARDCOMPL) { 457 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1); 458 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n"); 459 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack), 460 ntohl(rrch->len)); 461 } 462 #endif 463 mb->m_nextpkt = NULL; 464 if (ssk->rx_ctl_tail) 465 ssk->rx_ctl_tail->m_nextpkt = mb; 466 else 467 ssk->rx_ctl_q = mb; 468 ssk->rx_ctl_tail = mb; 469 470 return 0; 471 } 472 473 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid)); 474 mb = sdp_sock_queue_rcv_mb(sk, mb); 475 476 477 return 0; 478 } 479 480 /* called only from irq */ 481 static struct mbuf * 482 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) 483 { 484 struct mbuf *mb; 485 struct sdp_bsdh *h; 486 struct socket *sk = ssk->socket; 487 int mseq; 488 489 mb = sdp_recv_completion(ssk, wc->wr_id); 490 if (unlikely(!mb)) 491 return NULL; 492 493 if (unlikely(wc->status)) { 494 if (ssk->qp_active && sk) { 495 sdp_dbg(sk, "Recv completion with error. " 496 "Status %d, vendor: %d\n", 497 wc->status, wc->vendor_err); 498 sdp_abort(sk); 499 ssk->qp_active = 0; 500 } 501 m_freem(mb); 502 return NULL; 503 } 504 505 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", 506 (int)wc->wr_id, wc->byte_len); 507 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { 508 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n", 509 wc->byte_len, sizeof(struct sdp_bsdh)); 510 m_freem(mb); 511 return NULL; 512 } 513 /* Use m_adj to trim the tail of data we didn't use. */ 514 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len)); 515 h = mtod(mb, struct sdp_bsdh *); 516 517 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h); 518 519 ssk->rx_packets++; 520 ssk->rx_bytes += mb->m_pkthdr.len; 521 522 mseq = ntohl(h->mseq); 523 atomic_set(&ssk->mseq_ack, mseq); 524 if (mseq != (int)wc->wr_id) 525 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n", 526 mseq, (int)wc->wr_id); 527 528 return mb; 529 } 530 531 /* Wakeup writers if we now have credits. */ 532 static void 533 sdp_bzcopy_write_space(struct sdp_sock *ssk) 534 { 535 struct socket *sk = ssk->socket; 536 537 if (tx_credits(ssk) >= ssk->min_bufs && sk) 538 sowwakeup(sk); 539 } 540 541 /* only from interrupt. */ 542 static int 543 sdp_poll_rx_cq(struct sdp_sock *ssk) 544 { 545 struct ib_cq *cq = ssk->rx_ring.cq; 546 struct ib_wc ibwc[SDP_NUM_WC]; 547 int n, i; 548 int wc_processed = 0; 549 struct mbuf *mb; 550 551 do { 552 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); 553 for (i = 0; i < n; ++i) { 554 struct ib_wc *wc = &ibwc[i]; 555 556 BUG_ON(!(wc->wr_id & SDP_OP_RECV)); 557 mb = sdp_process_rx_wc(ssk, wc); 558 if (!mb) 559 continue; 560 561 sdp_process_rx_mb(ssk, mb); 562 wc_processed++; 563 } 564 } while (n == SDP_NUM_WC); 565 566 if (wc_processed) 567 sdp_bzcopy_write_space(ssk); 568 569 return wc_processed; 570 } 571 572 static void 573 sdp_rx_comp_work(struct work_struct *work) 574 { 575 struct sdp_sock *ssk = container_of(work, struct sdp_sock, 576 rx_comp_work); 577 578 sdp_prf(ssk->socket, NULL, "%s", __func__); 579 580 SDP_WLOCK(ssk); 581 if (unlikely(!ssk->qp)) { 582 sdp_prf(ssk->socket, NULL, "qp was destroyed"); 583 goto out; 584 } 585 if (unlikely(!ssk->rx_ring.cq)) { 586 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL"); 587 goto out; 588 } 589 590 if (unlikely(!ssk->poll_cq)) { 591 struct rdma_cm_id *id = ssk->id; 592 if (id && id->qp) 593 rdma_notify(id, IB_EVENT_COMM_EST); 594 goto out; 595 } 596 597 sdp_do_posts(ssk); 598 out: 599 SDP_WUNLOCK(ssk); 600 } 601 602 void 603 sdp_do_posts(struct sdp_sock *ssk) 604 { 605 struct socket *sk = ssk->socket; 606 int xmit_poll_force; 607 struct mbuf *mb; 608 609 SDP_WLOCK_ASSERT(ssk); 610 if (!ssk->qp_active) { 611 sdp_dbg(sk, "QP is deactivated\n"); 612 return; 613 } 614 615 while ((mb = ssk->rx_ctl_q)) { 616 ssk->rx_ctl_q = mb->m_nextpkt; 617 mb->m_nextpkt = NULL; 618 sdp_process_rx_ctl_mb(ssk, mb); 619 } 620 621 if (ssk->state == TCPS_TIME_WAIT) 622 return; 623 624 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq) 625 return; 626 627 sdp_post_recvs(ssk); 628 629 if (tx_ring_posted(ssk)) 630 sdp_xmit_poll(ssk, 1); 631 632 sdp_post_sends(ssk, M_NOWAIT); 633 634 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS; 635 636 if (credit_update_needed(ssk) || xmit_poll_force) { 637 /* if has pending tx because run out of tx_credits - xmit it */ 638 sdp_prf(sk, NULL, "Processing to free pending sends"); 639 sdp_xmit_poll(ssk, xmit_poll_force); 640 sdp_prf(sk, NULL, "Sending credit update"); 641 sdp_post_sends(ssk, M_NOWAIT); 642 } 643 644 } 645 646 int 647 sdp_process_rx(struct sdp_sock *ssk) 648 { 649 int wc_processed = 0; 650 int credits_before; 651 652 if (!rx_ring_trylock(&ssk->rx_ring)) { 653 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n"); 654 return 0; 655 } 656 657 credits_before = tx_credits(ssk); 658 659 wc_processed = sdp_poll_rx_cq(ssk); 660 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed); 661 662 if (wc_processed) { 663 sdp_prf(ssk->socket, NULL, "credits: %d -> %d", 664 credits_before, tx_credits(ssk)); 665 queue_work(rx_comp_wq, &ssk->rx_comp_work); 666 } 667 sdp_arm_rx_cq(ssk); 668 669 rx_ring_unlock(&ssk->rx_ring); 670 671 return (wc_processed); 672 } 673 674 static void 675 sdp_rx_irq(struct ib_cq *cq, void *cq_context) 676 { 677 struct socket *sk = cq_context; 678 struct sdp_sock *ssk = sdp_sk(sk); 679 680 if (cq != ssk->rx_ring.cq) { 681 sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq); 682 return; 683 } 684 685 SDPSTATS_COUNTER_INC(rx_int_count); 686 687 sdp_prf(sk, NULL, "rx irq"); 688 689 sdp_process_rx(ssk); 690 } 691 692 static 693 void sdp_rx_ring_purge(struct sdp_sock *ssk) 694 { 695 while (rx_ring_posted(ssk) > 0) { 696 struct mbuf *mb; 697 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring)); 698 if (!mb) 699 break; 700 m_freem(mb); 701 } 702 } 703 704 void 705 sdp_rx_ring_init(struct sdp_sock *ssk) 706 { 707 ssk->rx_ring.buffer = NULL; 708 ssk->rx_ring.destroyed = 0; 709 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock"); 710 } 711 712 static void 713 sdp_rx_cq_event_handler(struct ib_event *event, void *data) 714 { 715 } 716 717 int 718 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) 719 { 720 struct ib_cq *rx_cq; 721 int rc = 0; 722 723 724 sdp_dbg(ssk->socket, "rx ring created"); 725 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work); 726 atomic_set(&ssk->rx_ring.head, 1); 727 atomic_set(&ssk->rx_ring.tail, 1); 728 729 ssk->rx_ring.buffer = kmalloc( 730 sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL); 731 if (!ssk->rx_ring.buffer) { 732 sdp_warn(ssk->socket, 733 "Unable to allocate RX Ring size %zd.\n", 734 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE); 735 736 return -ENOMEM; 737 } 738 739 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler, 740 ssk->socket, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED); 741 742 if (IS_ERR(rx_cq)) { 743 rc = PTR_ERR(rx_cq); 744 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc); 745 goto err_cq; 746 } 747 748 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq; 749 sdp_arm_rx_cq(ssk); 750 751 return 0; 752 753 err_cq: 754 kfree(ssk->rx_ring.buffer); 755 ssk->rx_ring.buffer = NULL; 756 return rc; 757 } 758 759 void 760 sdp_rx_ring_destroy(struct sdp_sock *ssk) 761 { 762 763 cancel_work_sync(&ssk->rx_comp_work); 764 rx_ring_destroy_lock(&ssk->rx_ring); 765 766 if (ssk->rx_ring.buffer) { 767 sdp_rx_ring_purge(ssk); 768 769 kfree(ssk->rx_ring.buffer); 770 ssk->rx_ring.buffer = NULL; 771 } 772 773 if (ssk->rx_ring.cq) { 774 if (ib_destroy_cq(ssk->rx_ring.cq)) { 775 sdp_warn(ssk->socket, "destroy cq(%p) failed\n", 776 ssk->rx_ring.cq); 777 } else { 778 ssk->rx_ring.cq = NULL; 779 } 780 } 781 782 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring)); 783 } 784