1 /* 2 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 */ 32 #include "sdp.h" 33 34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024, 35 "Receive buffer initial size in bytes."); 36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8, 37 "Receive buffer size scale factor."); 38 39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ 40 static void 41 sdp_handle_disconn(struct sdp_sock *ssk) 42 { 43 44 sdp_dbg(ssk->socket, "%s\n", __func__); 45 46 SDP_WLOCK_ASSERT(ssk); 47 if (TCPS_HAVERCVDFIN(ssk->state) == 0) 48 socantrcvmore(ssk->socket); 49 50 switch (ssk->state) { 51 case TCPS_SYN_RECEIVED: 52 case TCPS_ESTABLISHED: 53 ssk->state = TCPS_CLOSE_WAIT; 54 break; 55 56 case TCPS_FIN_WAIT_1: 57 /* Received a reply FIN - start Infiniband tear down */ 58 sdp_dbg(ssk->socket, 59 "%s: Starting Infiniband tear down sending DREQ\n", 60 __func__); 61 62 sdp_cancel_dreq_wait_timeout(ssk); 63 ssk->qp_active = 0; 64 if (ssk->id) { 65 struct rdma_cm_id *id; 66 67 id = ssk->id; 68 SDP_WUNLOCK(ssk); 69 rdma_disconnect(id); 70 SDP_WLOCK(ssk); 71 } else { 72 sdp_warn(ssk->socket, 73 "%s: ssk->id is NULL\n", __func__); 74 return; 75 } 76 break; 77 case TCPS_TIME_WAIT: 78 /* This is a mutual close situation and we've got the DREQ from 79 the peer before the SDP_MID_DISCONNECT */ 80 break; 81 case TCPS_CLOSED: 82 /* FIN arrived after IB teardown started - do nothing */ 83 sdp_dbg(ssk->socket, "%s: fin in state %s\n", 84 __func__, sdp_state_str(ssk->state)); 85 return; 86 default: 87 sdp_warn(ssk->socket, 88 "%s: FIN in unexpected state. state=%d\n", 89 __func__, ssk->state); 90 break; 91 } 92 } 93 94 static int 95 sdp_post_recv(struct sdp_sock *ssk) 96 { 97 struct sdp_buf *rx_req; 98 int i, rc; 99 u64 addr; 100 struct ib_device *dev; 101 struct ib_recv_wr rx_wr = { NULL }; 102 struct ib_sge ibsge[SDP_MAX_RECV_SGES]; 103 struct ib_sge *sge = ibsge; 104 struct ib_recv_wr *bad_wr; 105 struct mbuf *mb, *m; 106 struct sdp_bsdh *h; 107 int id = ring_head(ssk->rx_ring); 108 109 /* Now, allocate and repost recv */ 110 sdp_prf(ssk->socket, mb, "Posting mb"); 111 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR); 112 if (mb == NULL) { 113 /* Retry so we can't stall out with no memory. */ 114 if (!rx_ring_posted(ssk)) 115 queue_work(rx_comp_wq, &ssk->rx_comp_work); 116 return -1; 117 } 118 for (m = mb; m != NULL; m = m->m_next) { 119 m->m_len = M_SIZE(m); 120 mb->m_pkthdr.len += m->m_len; 121 } 122 h = mtod(mb, struct sdp_bsdh *); 123 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1)); 124 rx_req->mb = mb; 125 dev = ssk->ib_device; 126 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) { 127 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len, 128 DMA_TO_DEVICE); 129 /* TODO: proper error handling */ 130 BUG_ON(ib_dma_mapping_error(dev, addr)); 131 BUG_ON(i >= SDP_MAX_RECV_SGES); 132 rx_req->mapping[i] = addr; 133 sge->addr = addr; 134 sge->length = mb->m_len; 135 sge->lkey = ssk->sdp_dev->mr->lkey; 136 } 137 138 rx_wr.next = NULL; 139 rx_wr.wr_id = id | SDP_OP_RECV; 140 rx_wr.sg_list = ibsge; 141 rx_wr.num_sge = i; 142 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr); 143 if (unlikely(rc)) { 144 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc); 145 146 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE); 147 m_freem(mb); 148 149 sdp_notify(ssk, ECONNRESET); 150 151 return -1; 152 } 153 154 atomic_inc(&ssk->rx_ring.head); 155 SDPSTATS_COUNTER_INC(post_recv); 156 157 return 0; 158 } 159 160 static inline int 161 sdp_post_recvs_needed(struct sdp_sock *ssk) 162 { 163 unsigned long bytes_in_process; 164 unsigned long max_bytes; 165 int buffer_size; 166 int posted; 167 168 if (!ssk->qp_active || !ssk->socket) 169 return 0; 170 171 posted = rx_ring_posted(ssk); 172 if (posted >= SDP_RX_SIZE) 173 return 0; 174 if (posted < SDP_MIN_TX_CREDITS) 175 return 1; 176 177 buffer_size = ssk->recv_bytes; 178 max_bytes = max(ssk->socket->so_snd.sb_hiwat, 179 (1 + SDP_MIN_TX_CREDITS) * buffer_size); 180 max_bytes *= rcvbuf_scale; 181 /* 182 * Compute bytes in the receive queue and socket buffer. 183 */ 184 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size; 185 bytes_in_process += sbused(&ssk->socket->so_rcv); 186 187 return bytes_in_process < max_bytes; 188 } 189 190 static inline void 191 sdp_post_recvs(struct sdp_sock *ssk) 192 { 193 194 while (sdp_post_recvs_needed(ssk)) 195 if (sdp_post_recv(ssk)) 196 return; 197 } 198 199 static inline struct mbuf * 200 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb) 201 { 202 struct sdp_sock *ssk = sdp_sk(sk); 203 struct sdp_bsdh *h; 204 205 h = mtod(mb, struct sdp_bsdh *); 206 207 #ifdef SDP_ZCOPY 208 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk); 209 if (h->mid == SDP_MID_SRCAVAIL) { 210 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1); 211 struct rx_srcavail_state *rx_sa; 212 213 ssk->srcavail_cancel_mseq = 0; 214 215 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc( 216 sizeof(struct rx_srcavail_state), M_NOWAIT); 217 218 rx_sa->mseq = ntohl(h->mseq); 219 rx_sa->used = 0; 220 rx_sa->len = mb_len = ntohl(srcah->len); 221 rx_sa->rkey = ntohl(srcah->rkey); 222 rx_sa->vaddr = be64_to_cpu(srcah->vaddr); 223 rx_sa->flags = 0; 224 225 if (ssk->tx_sa) { 226 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting " 227 "for TX SrcAvail. waking up TX SrcAvail" 228 "to be aborted\n"); 229 wake_up(sk->sk_sleep); 230 } 231 232 atomic_add(mb->len, &ssk->rcv_nxt); 233 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n", 234 mb_len, rx_sa->vaddr); 235 } else 236 #endif 237 { 238 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt); 239 } 240 241 m_adj(mb, SDP_HEAD_SIZE); 242 SOCKBUF_LOCK(&sk->so_rcv); 243 if (unlikely(h->flags & SDP_OOB_PRES)) 244 sdp_urg(ssk, mb); 245 sbappend_locked(&sk->so_rcv, mb, 0); 246 sorwakeup_locked(sk); 247 return mb; 248 } 249 250 static int 251 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size) 252 { 253 254 return MIN(new_size, SDP_MAX_PACKET); 255 } 256 257 int 258 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) 259 { 260 261 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size); 262 sdp_post_recvs(ssk); 263 264 return 0; 265 } 266 267 int 268 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) 269 { 270 u32 curr_size = ssk->recv_bytes; 271 u32 max_size = SDP_MAX_PACKET; 272 273 if (new_size > curr_size && new_size <= max_size) { 274 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size); 275 return 0; 276 } 277 return -1; 278 } 279 280 static void 281 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) 282 { 283 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0) 284 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1; 285 else 286 ssk->recv_request_head = ring_tail(ssk->rx_ring); 287 ssk->recv_request = 1; 288 } 289 290 static void 291 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) 292 { 293 u32 new_size = ntohl(buf->size); 294 295 if (new_size > ssk->xmit_size_goal) 296 ssk->xmit_size_goal = new_size; 297 } 298 299 static struct mbuf * 300 sdp_recv_completion(struct sdp_sock *ssk, int id) 301 { 302 struct sdp_buf *rx_req; 303 struct ib_device *dev; 304 struct mbuf *mb; 305 306 if (unlikely(id != ring_tail(ssk->rx_ring))) { 307 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", 308 id, ring_tail(ssk->rx_ring)); 309 return NULL; 310 } 311 312 dev = ssk->ib_device; 313 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)]; 314 mb = rx_req->mb; 315 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE); 316 317 atomic_inc(&ssk->rx_ring.tail); 318 atomic_dec(&ssk->remote_credits); 319 return mb; 320 } 321 322 /* socket lock should be taken before calling this */ 323 static int 324 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb) 325 { 326 struct sdp_bsdh *h; 327 struct socket *sk; 328 329 SDP_WLOCK_ASSERT(ssk); 330 sk = ssk->socket; 331 h = mtod(mb, struct sdp_bsdh *); 332 switch (h->mid) { 333 case SDP_MID_DATA: 334 case SDP_MID_SRCAVAIL: 335 sdp_dbg(sk, "DATA after socket rcv was shutdown\n"); 336 337 /* got data in RCV_SHUTDOWN */ 338 if (ssk->state == TCPS_FIN_WAIT_1) { 339 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n"); 340 sdp_notify(ssk, ECONNRESET); 341 } 342 m_freem(mb); 343 344 break; 345 #ifdef SDP_ZCOPY 346 case SDP_MID_RDMARDCOMPL: 347 m_freem(mb); 348 break; 349 case SDP_MID_SENDSM: 350 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack)); 351 m_freem(mb); 352 break; 353 case SDP_MID_SRCAVAIL_CANCEL: 354 sdp_dbg_data(sk, "Handling SrcAvailCancel\n"); 355 sdp_prf(sk, NULL, "Handling SrcAvailCancel"); 356 if (ssk->rx_sa) { 357 ssk->srcavail_cancel_mseq = ntohl(h->mseq); 358 ssk->rx_sa->flags |= RX_SA_ABORTED; 359 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 360 the dirty logic from recvmsg */ 361 } else { 362 sdp_dbg(sk, "Got SrcAvailCancel - " 363 "but no SrcAvail in process\n"); 364 } 365 m_freem(mb); 366 break; 367 case SDP_MID_SINKAVAIL: 368 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n"); 369 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored"); 370 /* FALLTHROUGH */ 371 #endif 372 case SDP_MID_ABORT: 373 sdp_dbg_data(sk, "Handling ABORT\n"); 374 sdp_prf(sk, NULL, "Handling ABORT"); 375 sdp_notify(ssk, ECONNRESET); 376 m_freem(mb); 377 break; 378 case SDP_MID_DISCONN: 379 sdp_dbg_data(sk, "Handling DISCONN\n"); 380 sdp_prf(sk, NULL, "Handling DISCONN"); 381 sdp_handle_disconn(ssk); 382 break; 383 case SDP_MID_CHRCVBUF: 384 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n"); 385 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1)); 386 m_freem(mb); 387 break; 388 case SDP_MID_CHRCVBUF_ACK: 389 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n"); 390 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1)); 391 m_freem(mb); 392 break; 393 default: 394 /* TODO: Handle other messages */ 395 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid); 396 m_freem(mb); 397 } 398 399 return 0; 400 } 401 402 static int 403 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb) 404 { 405 struct socket *sk; 406 struct sdp_bsdh *h; 407 unsigned long mseq_ack; 408 int credits_before; 409 410 h = mtod(mb, struct sdp_bsdh *); 411 sk = ssk->socket; 412 /* 413 * If another thread is in so_pcbfree this may be partially torn 414 * down but no further synchronization is required as the destroying 415 * thread will wait for receive to shutdown before discarding the 416 * socket. 417 */ 418 if (sk == NULL) { 419 m_freem(mb); 420 return 0; 421 } 422 423 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); 424 425 mseq_ack = ntohl(h->mseq_ack); 426 credits_before = tx_credits(ssk); 427 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 428 1 + ntohs(h->bufs)); 429 if (mseq_ack >= ssk->nagle_last_unacked) 430 ssk->nagle_last_unacked = 0; 431 432 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n", 433 mid2str(h->mid), ntohs(h->bufs), credits_before, 434 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); 435 436 if (unlikely(h->mid == SDP_MID_DATA && 437 mb->m_pkthdr.len == SDP_HEAD_SIZE)) { 438 /* Credit update is valid even after RCV_SHUTDOWN */ 439 m_freem(mb); 440 return 0; 441 } 442 443 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) || 444 TCPS_HAVERCVDFIN(ssk->state)) { 445 sdp_prf(sk, NULL, "Control mb - queing to control queue"); 446 #ifdef SDP_ZCOPY 447 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) { 448 sdp_dbg_data(sk, "Got SrcAvailCancel. " 449 "seq: 0x%d seq_ack: 0x%d\n", 450 ntohl(h->mseq), ntohl(h->mseq_ack)); 451 ssk->srcavail_cancel_mseq = ntohl(h->mseq); 452 } 453 454 455 if (h->mid == SDP_MID_RDMARDCOMPL) { 456 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1); 457 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n"); 458 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack), 459 ntohl(rrch->len)); 460 } 461 #endif 462 mb->m_nextpkt = NULL; 463 if (ssk->rx_ctl_tail) 464 ssk->rx_ctl_tail->m_nextpkt = mb; 465 else 466 ssk->rx_ctl_q = mb; 467 ssk->rx_ctl_tail = mb; 468 469 return 0; 470 } 471 472 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid)); 473 mb = sdp_sock_queue_rcv_mb(sk, mb); 474 475 476 return 0; 477 } 478 479 /* called only from irq */ 480 static struct mbuf * 481 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) 482 { 483 struct mbuf *mb; 484 struct sdp_bsdh *h; 485 struct socket *sk = ssk->socket; 486 int mseq; 487 488 mb = sdp_recv_completion(ssk, wc->wr_id); 489 if (unlikely(!mb)) 490 return NULL; 491 492 if (unlikely(wc->status)) { 493 if (ssk->qp_active && sk) { 494 sdp_dbg(sk, "Recv completion with error. " 495 "Status %d, vendor: %d\n", 496 wc->status, wc->vendor_err); 497 sdp_abort(sk); 498 ssk->qp_active = 0; 499 } 500 m_freem(mb); 501 return NULL; 502 } 503 504 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", 505 (int)wc->wr_id, wc->byte_len); 506 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { 507 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n", 508 wc->byte_len, sizeof(struct sdp_bsdh)); 509 m_freem(mb); 510 return NULL; 511 } 512 /* Use m_adj to trim the tail of data we didn't use. */ 513 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len)); 514 h = mtod(mb, struct sdp_bsdh *); 515 516 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h); 517 518 ssk->rx_packets++; 519 ssk->rx_bytes += mb->m_pkthdr.len; 520 521 mseq = ntohl(h->mseq); 522 atomic_set(&ssk->mseq_ack, mseq); 523 if (mseq != (int)wc->wr_id) 524 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n", 525 mseq, (int)wc->wr_id); 526 527 return mb; 528 } 529 530 /* Wakeup writers if we now have credits. */ 531 static void 532 sdp_bzcopy_write_space(struct sdp_sock *ssk) 533 { 534 struct socket *sk = ssk->socket; 535 536 if (tx_credits(ssk) >= ssk->min_bufs && sk) 537 sowwakeup(sk); 538 } 539 540 /* only from interrupt. */ 541 static int 542 sdp_poll_rx_cq(struct sdp_sock *ssk) 543 { 544 struct ib_cq *cq = ssk->rx_ring.cq; 545 struct ib_wc ibwc[SDP_NUM_WC]; 546 int n, i; 547 int wc_processed = 0; 548 struct mbuf *mb; 549 550 do { 551 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); 552 for (i = 0; i < n; ++i) { 553 struct ib_wc *wc = &ibwc[i]; 554 555 BUG_ON(!(wc->wr_id & SDP_OP_RECV)); 556 mb = sdp_process_rx_wc(ssk, wc); 557 if (!mb) 558 continue; 559 560 sdp_process_rx_mb(ssk, mb); 561 wc_processed++; 562 } 563 } while (n == SDP_NUM_WC); 564 565 if (wc_processed) 566 sdp_bzcopy_write_space(ssk); 567 568 return wc_processed; 569 } 570 571 static void 572 sdp_rx_comp_work(struct work_struct *work) 573 { 574 struct sdp_sock *ssk = container_of(work, struct sdp_sock, 575 rx_comp_work); 576 577 sdp_prf(ssk->socket, NULL, "%s", __func__); 578 579 SDP_WLOCK(ssk); 580 if (unlikely(!ssk->qp)) { 581 sdp_prf(ssk->socket, NULL, "qp was destroyed"); 582 goto out; 583 } 584 if (unlikely(!ssk->rx_ring.cq)) { 585 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL"); 586 goto out; 587 } 588 589 if (unlikely(!ssk->poll_cq)) { 590 struct rdma_cm_id *id = ssk->id; 591 if (id && id->qp) 592 rdma_notify(id, IB_EVENT_COMM_EST); 593 goto out; 594 } 595 596 sdp_do_posts(ssk); 597 out: 598 SDP_WUNLOCK(ssk); 599 } 600 601 void 602 sdp_do_posts(struct sdp_sock *ssk) 603 { 604 struct socket *sk = ssk->socket; 605 int xmit_poll_force; 606 struct mbuf *mb; 607 608 SDP_WLOCK_ASSERT(ssk); 609 if (!ssk->qp_active) { 610 sdp_dbg(sk, "QP is deactivated\n"); 611 return; 612 } 613 614 while ((mb = ssk->rx_ctl_q)) { 615 ssk->rx_ctl_q = mb->m_nextpkt; 616 mb->m_nextpkt = NULL; 617 sdp_process_rx_ctl_mb(ssk, mb); 618 } 619 620 if (ssk->state == TCPS_TIME_WAIT) 621 return; 622 623 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq) 624 return; 625 626 sdp_post_recvs(ssk); 627 628 if (tx_ring_posted(ssk)) 629 sdp_xmit_poll(ssk, 1); 630 631 sdp_post_sends(ssk, M_NOWAIT); 632 633 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS; 634 635 if (credit_update_needed(ssk) || xmit_poll_force) { 636 /* if has pending tx because run out of tx_credits - xmit it */ 637 sdp_prf(sk, NULL, "Processing to free pending sends"); 638 sdp_xmit_poll(ssk, xmit_poll_force); 639 sdp_prf(sk, NULL, "Sending credit update"); 640 sdp_post_sends(ssk, M_NOWAIT); 641 } 642 643 } 644 645 int 646 sdp_process_rx(struct sdp_sock *ssk) 647 { 648 int wc_processed = 0; 649 int credits_before; 650 651 if (!rx_ring_trylock(&ssk->rx_ring)) { 652 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n"); 653 return 0; 654 } 655 656 credits_before = tx_credits(ssk); 657 658 wc_processed = sdp_poll_rx_cq(ssk); 659 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed); 660 661 if (wc_processed) { 662 sdp_prf(ssk->socket, NULL, "credits: %d -> %d", 663 credits_before, tx_credits(ssk)); 664 queue_work(rx_comp_wq, &ssk->rx_comp_work); 665 } 666 sdp_arm_rx_cq(ssk); 667 668 rx_ring_unlock(&ssk->rx_ring); 669 670 return (wc_processed); 671 } 672 673 static void 674 sdp_rx_irq(struct ib_cq *cq, void *cq_context) 675 { 676 struct socket *sk = cq_context; 677 struct sdp_sock *ssk = sdp_sk(sk); 678 679 if (cq != ssk->rx_ring.cq) { 680 sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq); 681 return; 682 } 683 684 SDPSTATS_COUNTER_INC(rx_int_count); 685 686 sdp_prf(sk, NULL, "rx irq"); 687 688 sdp_process_rx(ssk); 689 } 690 691 static 692 void sdp_rx_ring_purge(struct sdp_sock *ssk) 693 { 694 while (rx_ring_posted(ssk) > 0) { 695 struct mbuf *mb; 696 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring)); 697 if (!mb) 698 break; 699 m_freem(mb); 700 } 701 } 702 703 void 704 sdp_rx_ring_init(struct sdp_sock *ssk) 705 { 706 ssk->rx_ring.buffer = NULL; 707 ssk->rx_ring.destroyed = 0; 708 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock"); 709 } 710 711 static void 712 sdp_rx_cq_event_handler(struct ib_event *event, void *data) 713 { 714 } 715 716 int 717 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) 718 { 719 struct ib_cq *rx_cq; 720 int rc = 0; 721 722 723 sdp_dbg(ssk->socket, "rx ring created"); 724 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work); 725 atomic_set(&ssk->rx_ring.head, 1); 726 atomic_set(&ssk->rx_ring.tail, 1); 727 728 ssk->rx_ring.buffer = kmalloc( 729 sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL); 730 if (!ssk->rx_ring.buffer) { 731 sdp_warn(ssk->socket, 732 "Unable to allocate RX Ring size %zd.\n", 733 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE); 734 735 return -ENOMEM; 736 } 737 738 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler, 739 ssk->socket, SDP_RX_SIZE, 0); 740 741 if (IS_ERR(rx_cq)) { 742 rc = PTR_ERR(rx_cq); 743 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc); 744 goto err_cq; 745 } 746 747 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq; 748 sdp_arm_rx_cq(ssk); 749 750 return 0; 751 752 err_cq: 753 kfree(ssk->rx_ring.buffer); 754 ssk->rx_ring.buffer = NULL; 755 return rc; 756 } 757 758 void 759 sdp_rx_ring_destroy(struct sdp_sock *ssk) 760 { 761 762 cancel_work_sync(&ssk->rx_comp_work); 763 rx_ring_destroy_lock(&ssk->rx_ring); 764 765 if (ssk->rx_ring.buffer) { 766 sdp_rx_ring_purge(ssk); 767 768 kfree(ssk->rx_ring.buffer); 769 ssk->rx_ring.buffer = NULL; 770 } 771 772 if (ssk->rx_ring.cq) { 773 if (ib_destroy_cq(ssk->rx_ring.cq)) { 774 sdp_warn(ssk->socket, "destroy cq(%p) failed\n", 775 ssk->rx_ring.cq); 776 } else { 777 ssk->rx_ring.cq = NULL; 778 } 779 } 780 781 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring)); 782 } 783