1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0 3 * 4 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the 10 * OpenIB.org BSD license below: 11 * 12 * Redistribution and use in source and binary forms, with or 13 * without modification, are permitted provided that the following 14 * conditions are met: 15 * 16 * - Redistributions of source code must retain the above 17 * copyright notice, this list of conditions and the following 18 * disclaimer. 19 * 20 * - Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials 23 * provided with the distribution. 24 * 25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32 * SOFTWARE. 33 */ 34 #include "sdp.h" 35 36 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024, 37 "Receive buffer initial size in bytes."); 38 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8, 39 "Receive buffer size scale factor."); 40 41 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ 42 static void 43 sdp_handle_disconn(struct sdp_sock *ssk) 44 { 45 46 sdp_dbg(ssk->socket, "%s\n", __func__); 47 48 SDP_WLOCK_ASSERT(ssk); 49 if (TCPS_HAVERCVDFIN(ssk->state) == 0) 50 socantrcvmore(ssk->socket); 51 52 switch (ssk->state) { 53 case TCPS_SYN_RECEIVED: 54 case TCPS_ESTABLISHED: 55 ssk->state = TCPS_CLOSE_WAIT; 56 break; 57 58 case TCPS_FIN_WAIT_1: 59 /* Received a reply FIN - start Infiniband tear down */ 60 sdp_dbg(ssk->socket, 61 "%s: Starting Infiniband tear down sending DREQ\n", 62 __func__); 63 64 sdp_cancel_dreq_wait_timeout(ssk); 65 ssk->qp_active = 0; 66 if (ssk->id) { 67 struct rdma_cm_id *id; 68 69 id = ssk->id; 70 SDP_WUNLOCK(ssk); 71 rdma_disconnect(id); 72 SDP_WLOCK(ssk); 73 } else { 74 sdp_warn(ssk->socket, 75 "%s: ssk->id is NULL\n", __func__); 76 return; 77 } 78 break; 79 case TCPS_TIME_WAIT: 80 /* This is a mutual close situation and we've got the DREQ from 81 the peer before the SDP_MID_DISCONNECT */ 82 break; 83 case TCPS_CLOSED: 84 /* FIN arrived after IB teardown started - do nothing */ 85 sdp_dbg(ssk->socket, "%s: fin in state %s\n", 86 __func__, sdp_state_str(ssk->state)); 87 return; 88 default: 89 sdp_warn(ssk->socket, 90 "%s: FIN in unexpected state. state=%d\n", 91 __func__, ssk->state); 92 break; 93 } 94 } 95 96 static int 97 sdp_post_recv(struct sdp_sock *ssk) 98 { 99 struct sdp_buf *rx_req; 100 int i, rc; 101 u64 addr; 102 struct ib_device *dev; 103 struct ib_recv_wr rx_wr = { NULL }; 104 struct ib_sge ibsge[SDP_MAX_RECV_SGES]; 105 struct ib_sge *sge = ibsge; 106 const struct ib_recv_wr *bad_wr; 107 struct mbuf *mb, *m; 108 struct sdp_bsdh *h; 109 int id = ring_head(ssk->rx_ring); 110 111 /* Now, allocate and repost recv */ 112 sdp_prf(ssk->socket, mb, "Posting mb"); 113 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR); 114 if (mb == NULL) { 115 /* Retry so we can't stall out with no memory. */ 116 if (!rx_ring_posted(ssk)) 117 queue_work(rx_comp_wq, &ssk->rx_comp_work); 118 return -1; 119 } 120 for (m = mb; m != NULL; m = m->m_next) { 121 m->m_len = M_SIZE(m); 122 mb->m_pkthdr.len += m->m_len; 123 } 124 h = mtod(mb, struct sdp_bsdh *); 125 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1)); 126 rx_req->mb = mb; 127 dev = ssk->ib_device; 128 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) { 129 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len, 130 DMA_TO_DEVICE); 131 /* TODO: proper error handling */ 132 BUG_ON(ib_dma_mapping_error(dev, addr)); 133 BUG_ON(i >= SDP_MAX_RECV_SGES); 134 rx_req->mapping[i] = addr; 135 sge->addr = addr; 136 sge->length = mb->m_len; 137 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey; 138 } 139 140 rx_wr.next = NULL; 141 rx_wr.wr_id = id | SDP_OP_RECV; 142 rx_wr.sg_list = ibsge; 143 rx_wr.num_sge = i; 144 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr); 145 if (unlikely(rc)) { 146 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc); 147 148 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE); 149 m_freem(mb); 150 151 sdp_notify(ssk, ECONNRESET); 152 153 return -1; 154 } 155 156 atomic_inc(&ssk->rx_ring.head); 157 SDPSTATS_COUNTER_INC(post_recv); 158 159 return 0; 160 } 161 162 static inline int 163 sdp_post_recvs_needed(struct sdp_sock *ssk) 164 { 165 unsigned long bytes_in_process; 166 unsigned long max_bytes; 167 int buffer_size; 168 int posted; 169 170 if (!ssk->qp_active || !ssk->socket) 171 return 0; 172 173 posted = rx_ring_posted(ssk); 174 if (posted >= SDP_RX_SIZE) 175 return 0; 176 if (posted < SDP_MIN_TX_CREDITS) 177 return 1; 178 179 buffer_size = ssk->recv_bytes; 180 max_bytes = max(ssk->socket->so_rcv.sb_hiwat, 181 (1 + SDP_MIN_TX_CREDITS) * buffer_size); 182 max_bytes *= rcvbuf_scale; 183 /* 184 * Compute bytes in the receive queue and socket buffer. 185 */ 186 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size; 187 bytes_in_process += sbused(&ssk->socket->so_rcv); 188 189 return bytes_in_process < max_bytes; 190 } 191 192 static inline void 193 sdp_post_recvs(struct sdp_sock *ssk) 194 { 195 196 while (sdp_post_recvs_needed(ssk)) 197 if (sdp_post_recv(ssk)) 198 return; 199 } 200 201 static inline struct mbuf * 202 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb) 203 { 204 struct sdp_sock *ssk = sdp_sk(sk); 205 struct sdp_bsdh *h; 206 207 h = mtod(mb, struct sdp_bsdh *); 208 209 #ifdef SDP_ZCOPY 210 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk); 211 if (h->mid == SDP_MID_SRCAVAIL) { 212 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1); 213 struct rx_srcavail_state *rx_sa; 214 215 ssk->srcavail_cancel_mseq = 0; 216 217 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc( 218 sizeof(struct rx_srcavail_state), M_NOWAIT); 219 220 rx_sa->mseq = ntohl(h->mseq); 221 rx_sa->used = 0; 222 rx_sa->len = mb_len = ntohl(srcah->len); 223 rx_sa->rkey = ntohl(srcah->rkey); 224 rx_sa->vaddr = be64_to_cpu(srcah->vaddr); 225 rx_sa->flags = 0; 226 227 if (ssk->tx_sa) { 228 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting " 229 "for TX SrcAvail. waking up TX SrcAvail" 230 "to be aborted\n"); 231 wake_up(sk->sk_sleep); 232 } 233 234 atomic_add(mb->len, &ssk->rcv_nxt); 235 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n", 236 mb_len, rx_sa->vaddr); 237 } else 238 #endif 239 { 240 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt); 241 } 242 243 m_adj(mb, SDP_HEAD_SIZE); 244 SOCKBUF_LOCK(&sk->so_rcv); 245 if (unlikely(h->flags & SDP_OOB_PRES)) 246 sdp_urg(ssk, mb); 247 sbappend_locked(&sk->so_rcv, mb, 0); 248 sorwakeup_locked(sk); 249 return mb; 250 } 251 252 static int 253 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size) 254 { 255 256 return MIN(new_size, SDP_MAX_PACKET); 257 } 258 259 int 260 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) 261 { 262 263 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size); 264 sdp_post_recvs(ssk); 265 266 return 0; 267 } 268 269 int 270 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) 271 { 272 u32 curr_size = ssk->recv_bytes; 273 u32 max_size = SDP_MAX_PACKET; 274 275 if (new_size > curr_size && new_size <= max_size) { 276 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size); 277 return 0; 278 } 279 return -1; 280 } 281 282 static void 283 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) 284 { 285 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0) 286 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1; 287 else 288 ssk->recv_request_head = ring_tail(ssk->rx_ring); 289 ssk->recv_request = 1; 290 } 291 292 static void 293 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) 294 { 295 u32 new_size = ntohl(buf->size); 296 297 if (new_size > ssk->xmit_size_goal) 298 ssk->xmit_size_goal = new_size; 299 } 300 301 static struct mbuf * 302 sdp_recv_completion(struct sdp_sock *ssk, int id) 303 { 304 struct sdp_buf *rx_req; 305 struct ib_device *dev; 306 struct mbuf *mb; 307 308 if (unlikely(id != ring_tail(ssk->rx_ring))) { 309 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", 310 id, ring_tail(ssk->rx_ring)); 311 return NULL; 312 } 313 314 dev = ssk->ib_device; 315 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)]; 316 mb = rx_req->mb; 317 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE); 318 319 atomic_inc(&ssk->rx_ring.tail); 320 atomic_dec(&ssk->remote_credits); 321 return mb; 322 } 323 324 static void 325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb) 326 { 327 struct sdp_bsdh *h; 328 struct socket *sk; 329 330 SDP_WLOCK_ASSERT(ssk); 331 332 sk = ssk->socket; 333 h = mtod(mb, struct sdp_bsdh *); 334 switch (h->mid) { 335 case SDP_MID_DATA: 336 case SDP_MID_SRCAVAIL: 337 sdp_dbg(sk, "DATA after socket rcv was shutdown\n"); 338 339 /* got data in RCV_SHUTDOWN */ 340 if (ssk->state == TCPS_FIN_WAIT_1) { 341 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n"); 342 sdp_notify(ssk, ECONNRESET); 343 } 344 345 break; 346 #ifdef SDP_ZCOPY 347 case SDP_MID_RDMARDCOMPL: 348 break; 349 case SDP_MID_SENDSM: 350 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack)); 351 break; 352 case SDP_MID_SRCAVAIL_CANCEL: 353 sdp_dbg_data(sk, "Handling SrcAvailCancel\n"); 354 sdp_prf(sk, NULL, "Handling SrcAvailCancel"); 355 if (ssk->rx_sa) { 356 ssk->srcavail_cancel_mseq = ntohl(h->mseq); 357 ssk->rx_sa->flags |= RX_SA_ABORTED; 358 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 359 the dirty logic from recvmsg */ 360 } else { 361 sdp_dbg(sk, "Got SrcAvailCancel - " 362 "but no SrcAvail in process\n"); 363 } 364 break; 365 case SDP_MID_SINKAVAIL: 366 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n"); 367 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored"); 368 /* FALLTHROUGH */ 369 #endif 370 case SDP_MID_ABORT: 371 sdp_dbg_data(sk, "Handling ABORT\n"); 372 sdp_prf(sk, NULL, "Handling ABORT"); 373 sdp_notify(ssk, ECONNRESET); 374 break; 375 case SDP_MID_DISCONN: 376 sdp_dbg_data(sk, "Handling DISCONN\n"); 377 sdp_prf(sk, NULL, "Handling DISCONN"); 378 sdp_handle_disconn(ssk); 379 break; 380 case SDP_MID_CHRCVBUF: 381 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n"); 382 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1)); 383 break; 384 case SDP_MID_CHRCVBUF_ACK: 385 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n"); 386 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1)); 387 break; 388 default: 389 /* TODO: Handle other messages */ 390 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid); 391 break; 392 } 393 m_freem(mb); 394 } 395 396 static int 397 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb) 398 { 399 struct socket *sk; 400 struct sdp_bsdh *h; 401 unsigned long mseq_ack; 402 int credits_before; 403 404 h = mtod(mb, struct sdp_bsdh *); 405 sk = ssk->socket; 406 /* 407 * If another thread is in so_pcbfree this may be partially torn 408 * down but no further synchronization is required as the destroying 409 * thread will wait for receive to shutdown before discarding the 410 * socket. 411 */ 412 if (sk == NULL) { 413 m_freem(mb); 414 return 0; 415 } 416 417 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); 418 419 mseq_ack = ntohl(h->mseq_ack); 420 credits_before = tx_credits(ssk); 421 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 422 1 + ntohs(h->bufs)); 423 if (mseq_ack >= ssk->nagle_last_unacked) 424 ssk->nagle_last_unacked = 0; 425 426 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n", 427 mid2str(h->mid), ntohs(h->bufs), credits_before, 428 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); 429 430 if (unlikely(h->mid == SDP_MID_DATA && 431 mb->m_pkthdr.len == SDP_HEAD_SIZE)) { 432 /* Credit update is valid even after RCV_SHUTDOWN */ 433 m_freem(mb); 434 return 0; 435 } 436 437 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) || 438 TCPS_HAVERCVDFIN(ssk->state)) { 439 sdp_prf(sk, NULL, "Control mb - queing to control queue"); 440 #ifdef SDP_ZCOPY 441 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) { 442 sdp_dbg_data(sk, "Got SrcAvailCancel. " 443 "seq: 0x%d seq_ack: 0x%d\n", 444 ntohl(h->mseq), ntohl(h->mseq_ack)); 445 ssk->srcavail_cancel_mseq = ntohl(h->mseq); 446 } 447 448 449 if (h->mid == SDP_MID_RDMARDCOMPL) { 450 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1); 451 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n"); 452 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack), 453 ntohl(rrch->len)); 454 } 455 #endif 456 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0) 457 m_freem(mb); 458 return (0); 459 } 460 461 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid)); 462 mb = sdp_sock_queue_rcv_mb(sk, mb); 463 464 465 return 0; 466 } 467 468 /* called only from irq */ 469 static struct mbuf * 470 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) 471 { 472 struct mbuf *mb; 473 struct sdp_bsdh *h; 474 struct socket *sk = ssk->socket; 475 int mseq; 476 477 mb = sdp_recv_completion(ssk, wc->wr_id); 478 if (unlikely(!mb)) 479 return NULL; 480 481 if (unlikely(wc->status)) { 482 if (ssk->qp_active && sk) { 483 sdp_dbg(sk, "Recv completion with error. " 484 "Status %s (%d), vendor: %d\n", 485 ib_wc_status_msg(wc->status), wc->status, 486 wc->vendor_err); 487 sdp_abort(sk); 488 ssk->qp_active = 0; 489 } 490 m_freem(mb); 491 return NULL; 492 } 493 494 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", 495 (int)wc->wr_id, wc->byte_len); 496 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { 497 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n", 498 wc->byte_len, sizeof(struct sdp_bsdh)); 499 m_freem(mb); 500 return NULL; 501 } 502 /* Use m_adj to trim the tail of data we didn't use. */ 503 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len)); 504 h = mtod(mb, struct sdp_bsdh *); 505 506 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h); 507 508 ssk->rx_packets++; 509 ssk->rx_bytes += mb->m_pkthdr.len; 510 511 mseq = ntohl(h->mseq); 512 atomic_set(&ssk->mseq_ack, mseq); 513 if (mseq != (int)wc->wr_id) 514 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n", 515 mseq, (int)wc->wr_id); 516 517 return mb; 518 } 519 520 /* Wakeup writers if we now have credits. */ 521 static void 522 sdp_bzcopy_write_space(struct sdp_sock *ssk) 523 { 524 struct socket *sk = ssk->socket; 525 526 if (tx_credits(ssk) >= ssk->min_bufs && sk) 527 sowwakeup(sk); 528 } 529 530 /* only from interrupt. */ 531 static int 532 sdp_poll_rx_cq(struct sdp_sock *ssk) 533 { 534 struct ib_cq *cq = ssk->rx_ring.cq; 535 struct ib_wc ibwc[SDP_NUM_WC]; 536 int n, i; 537 int wc_processed = 0; 538 struct mbuf *mb; 539 540 do { 541 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); 542 for (i = 0; i < n; ++i) { 543 struct ib_wc *wc = &ibwc[i]; 544 545 BUG_ON(!(wc->wr_id & SDP_OP_RECV)); 546 mb = sdp_process_rx_wc(ssk, wc); 547 if (!mb) 548 continue; 549 550 sdp_process_rx_mb(ssk, mb); 551 wc_processed++; 552 } 553 } while (n == SDP_NUM_WC); 554 555 if (wc_processed) 556 sdp_bzcopy_write_space(ssk); 557 558 return wc_processed; 559 } 560 561 static void 562 sdp_rx_comp_work(struct work_struct *work) 563 { 564 struct sdp_sock *ssk = container_of(work, struct sdp_sock, 565 rx_comp_work); 566 567 sdp_prf(ssk->socket, NULL, "%s", __func__); 568 569 SDP_WLOCK(ssk); 570 if (unlikely(!ssk->qp)) { 571 sdp_prf(ssk->socket, NULL, "qp was destroyed"); 572 goto out; 573 } 574 if (unlikely(!ssk->rx_ring.cq)) { 575 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL"); 576 goto out; 577 } 578 579 if (unlikely(!ssk->poll_cq)) { 580 struct rdma_cm_id *id = ssk->id; 581 if (id && id->qp) 582 rdma_notify(id, IB_EVENT_COMM_EST); 583 goto out; 584 } 585 586 sdp_do_posts(ssk); 587 out: 588 SDP_WUNLOCK(ssk); 589 } 590 591 void 592 sdp_do_posts(struct sdp_sock *ssk) 593 { 594 struct socket *sk = ssk->socket; 595 int xmit_poll_force; 596 struct mbuf *mb; 597 598 SDP_WLOCK_ASSERT(ssk); 599 if (!ssk->qp_active) { 600 sdp_dbg(sk, "QP is deactivated\n"); 601 return; 602 } 603 604 while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL) 605 sdp_process_rx_ctl_mb(ssk, mb); 606 607 if (ssk->state == TCPS_TIME_WAIT) 608 return; 609 610 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq) 611 return; 612 613 sdp_post_recvs(ssk); 614 615 if (tx_ring_posted(ssk)) 616 sdp_xmit_poll(ssk, 1); 617 618 sdp_post_sends(ssk, M_NOWAIT); 619 620 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS; 621 622 if (credit_update_needed(ssk) || xmit_poll_force) { 623 /* if has pending tx because run out of tx_credits - xmit it */ 624 sdp_prf(sk, NULL, "Processing to free pending sends"); 625 sdp_xmit_poll(ssk, xmit_poll_force); 626 sdp_prf(sk, NULL, "Sending credit update"); 627 sdp_post_sends(ssk, M_NOWAIT); 628 } 629 630 } 631 632 int 633 sdp_process_rx(struct sdp_sock *ssk) 634 { 635 int wc_processed = 0; 636 int credits_before; 637 638 if (!rx_ring_trylock(&ssk->rx_ring)) { 639 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n"); 640 return 0; 641 } 642 643 credits_before = tx_credits(ssk); 644 645 wc_processed = sdp_poll_rx_cq(ssk); 646 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed); 647 648 if (wc_processed) { 649 sdp_prf(ssk->socket, NULL, "credits: %d -> %d", 650 credits_before, tx_credits(ssk)); 651 queue_work(rx_comp_wq, &ssk->rx_comp_work); 652 } 653 sdp_arm_rx_cq(ssk); 654 655 rx_ring_unlock(&ssk->rx_ring); 656 657 return (wc_processed); 658 } 659 660 static void 661 sdp_rx_irq(struct ib_cq *cq, void *cq_context) 662 { 663 struct sdp_sock *ssk; 664 665 ssk = cq_context; 666 KASSERT(cq == ssk->rx_ring.cq, 667 ("%s: mismatched cq on %p", __func__, ssk)); 668 669 SDPSTATS_COUNTER_INC(rx_int_count); 670 671 sdp_prf(sk, NULL, "rx irq"); 672 673 sdp_process_rx(ssk); 674 } 675 676 static 677 void sdp_rx_ring_purge(struct sdp_sock *ssk) 678 { 679 while (rx_ring_posted(ssk) > 0) { 680 struct mbuf *mb; 681 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring)); 682 if (!mb) 683 break; 684 m_freem(mb); 685 } 686 } 687 688 void 689 sdp_rx_ring_init(struct sdp_sock *ssk) 690 { 691 ssk->rx_ring.buffer = NULL; 692 ssk->rx_ring.destroyed = 0; 693 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock"); 694 } 695 696 static void 697 sdp_rx_cq_event_handler(struct ib_event *event, void *data) 698 { 699 } 700 701 int 702 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) 703 { 704 struct ib_cq_init_attr rx_cq_attr = { 705 .cqe = SDP_RX_SIZE, 706 .comp_vector = 0, 707 .flags = 0, 708 }; 709 struct ib_cq *rx_cq; 710 int rc = 0; 711 712 sdp_dbg(ssk->socket, "rx ring created"); 713 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work); 714 atomic_set(&ssk->rx_ring.head, 1); 715 atomic_set(&ssk->rx_ring.tail, 1); 716 717 ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE, 718 M_SDP, M_WAITOK); 719 720 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler, 721 ssk, &rx_cq_attr); 722 if (IS_ERR(rx_cq)) { 723 rc = PTR_ERR(rx_cq); 724 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc); 725 goto err_cq; 726 } 727 728 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq; 729 sdp_arm_rx_cq(ssk); 730 731 return 0; 732 733 err_cq: 734 free(ssk->rx_ring.buffer, M_SDP); 735 ssk->rx_ring.buffer = NULL; 736 return rc; 737 } 738 739 void 740 sdp_rx_ring_destroy(struct sdp_sock *ssk) 741 { 742 743 cancel_work_sync(&ssk->rx_comp_work); 744 rx_ring_destroy_lock(&ssk->rx_ring); 745 746 if (ssk->rx_ring.buffer) { 747 sdp_rx_ring_purge(ssk); 748 free(ssk->rx_ring.buffer, M_SDP); 749 ssk->rx_ring.buffer = NULL; 750 } 751 752 if (ssk->rx_ring.cq) { 753 if (ib_destroy_cq(ssk->rx_ring.cq)) { 754 sdp_warn(ssk->socket, "destroy cq(%p) failed\n", 755 ssk->rx_ring.cq); 756 } else { 757 ssk->rx_ring.cq = NULL; 758 } 759 } 760 761 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring)); 762 } 763