1 /*-
2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3 *
4 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
18 * disclaimer.
19 *
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 */
34 #include "sdp.h"
35
36 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
37 "Receive buffer initial size in bytes.");
38 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
39 "Receive buffer size scale factor.");
40
41 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
42 static void
sdp_handle_disconn(struct sdp_sock * ssk)43 sdp_handle_disconn(struct sdp_sock *ssk)
44 {
45
46 sdp_dbg(ssk->socket, "%s\n", __func__);
47
48 SDP_WLOCK_ASSERT(ssk);
49 if (TCPS_HAVERCVDFIN(ssk->state) == 0)
50 socantrcvmore(ssk->socket);
51
52 switch (ssk->state) {
53 case TCPS_SYN_RECEIVED:
54 case TCPS_ESTABLISHED:
55 ssk->state = TCPS_CLOSE_WAIT;
56 break;
57
58 case TCPS_FIN_WAIT_1:
59 /* Received a reply FIN - start Infiniband tear down */
60 sdp_dbg(ssk->socket,
61 "%s: Starting Infiniband tear down sending DREQ\n",
62 __func__);
63
64 sdp_cancel_dreq_wait_timeout(ssk);
65 ssk->qp_active = 0;
66 if (ssk->id) {
67 struct rdma_cm_id *id;
68
69 id = ssk->id;
70 SDP_WUNLOCK(ssk);
71 rdma_disconnect(id);
72 SDP_WLOCK(ssk);
73 } else {
74 sdp_warn(ssk->socket,
75 "%s: ssk->id is NULL\n", __func__);
76 return;
77 }
78 break;
79 case TCPS_TIME_WAIT:
80 /* This is a mutual close situation and we've got the DREQ from
81 the peer before the SDP_MID_DISCONNECT */
82 break;
83 case TCPS_CLOSED:
84 /* FIN arrived after IB teardown started - do nothing */
85 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
86 __func__, sdp_state_str(ssk->state));
87 return;
88 default:
89 sdp_warn(ssk->socket,
90 "%s: FIN in unexpected state. state=%d\n",
91 __func__, ssk->state);
92 break;
93 }
94 }
95
96 static int
sdp_post_recv(struct sdp_sock * ssk)97 sdp_post_recv(struct sdp_sock *ssk)
98 {
99 struct sdp_buf *rx_req;
100 int i, rc;
101 u64 addr;
102 struct ib_device *dev;
103 struct ib_recv_wr rx_wr = { NULL };
104 struct ib_sge ibsge[SDP_MAX_RECV_SGES];
105 struct ib_sge *sge = ibsge;
106 const struct ib_recv_wr *bad_wr;
107 struct mbuf *mb, *m;
108 struct sdp_bsdh *h;
109 int id = ring_head(ssk->rx_ring);
110
111 /* Now, allocate and repost recv */
112 sdp_prf(ssk->socket, mb, "Posting mb");
113 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
114 if (mb == NULL) {
115 /* Retry so we can't stall out with no memory. */
116 if (!rx_ring_posted(ssk))
117 queue_work(rx_comp_wq, &ssk->rx_comp_work);
118 return -1;
119 }
120 for (m = mb; m != NULL; m = m->m_next) {
121 m->m_len = M_SIZE(m);
122 mb->m_pkthdr.len += m->m_len;
123 }
124 h = mtod(mb, struct sdp_bsdh *);
125 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
126 rx_req->mb = mb;
127 dev = ssk->ib_device;
128 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
129 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
130 DMA_TO_DEVICE);
131 /* TODO: proper error handling */
132 BUG_ON(ib_dma_mapping_error(dev, addr));
133 BUG_ON(i >= SDP_MAX_RECV_SGES);
134 rx_req->mapping[i] = addr;
135 sge->addr = addr;
136 sge->length = mb->m_len;
137 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
138 }
139
140 rx_wr.next = NULL;
141 rx_wr.wr_id = id | SDP_OP_RECV;
142 rx_wr.sg_list = ibsge;
143 rx_wr.num_sge = i;
144 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
145 if (unlikely(rc)) {
146 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
147
148 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
149 m_freem(mb);
150
151 sdp_notify(ssk, ECONNRESET);
152
153 return -1;
154 }
155
156 atomic_inc(&ssk->rx_ring.head);
157 SDPSTATS_COUNTER_INC(post_recv);
158
159 return 0;
160 }
161
162 static inline int
sdp_post_recvs_needed(struct sdp_sock * ssk)163 sdp_post_recvs_needed(struct sdp_sock *ssk)
164 {
165 unsigned long bytes_in_process;
166 unsigned long max_bytes;
167 int buffer_size;
168 int posted;
169
170 if (!ssk->qp_active || !ssk->socket)
171 return 0;
172
173 posted = rx_ring_posted(ssk);
174 if (posted >= SDP_RX_SIZE)
175 return 0;
176 if (posted < SDP_MIN_TX_CREDITS)
177 return 1;
178
179 buffer_size = ssk->recv_bytes;
180 max_bytes = max(ssk->socket->so_rcv.sb_hiwat,
181 (1 + SDP_MIN_TX_CREDITS) * buffer_size);
182 max_bytes *= rcvbuf_scale;
183 /*
184 * Compute bytes in the receive queue and socket buffer.
185 */
186 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
187 bytes_in_process += sbused(&ssk->socket->so_rcv);
188
189 return bytes_in_process < max_bytes;
190 }
191
192 static inline void
sdp_post_recvs(struct sdp_sock * ssk)193 sdp_post_recvs(struct sdp_sock *ssk)
194 {
195
196 while (sdp_post_recvs_needed(ssk))
197 if (sdp_post_recv(ssk))
198 return;
199 }
200
201 static inline struct mbuf *
sdp_sock_queue_rcv_mb(struct socket * sk,struct mbuf * mb)202 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
203 {
204 struct sdp_sock *ssk = sdp_sk(sk);
205 struct sdp_bsdh *h;
206
207 h = mtod(mb, struct sdp_bsdh *);
208
209 #ifdef SDP_ZCOPY
210 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
211 if (h->mid == SDP_MID_SRCAVAIL) {
212 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
213 struct rx_srcavail_state *rx_sa;
214
215 ssk->srcavail_cancel_mseq = 0;
216
217 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
218 sizeof(struct rx_srcavail_state), M_NOWAIT);
219
220 rx_sa->mseq = ntohl(h->mseq);
221 rx_sa->used = 0;
222 rx_sa->len = mb_len = ntohl(srcah->len);
223 rx_sa->rkey = ntohl(srcah->rkey);
224 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
225 rx_sa->flags = 0;
226
227 if (ssk->tx_sa) {
228 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
229 "for TX SrcAvail. waking up TX SrcAvail"
230 "to be aborted\n");
231 wake_up(sk->sk_sleep);
232 }
233
234 atomic_add(mb->len, &ssk->rcv_nxt);
235 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
236 mb_len, rx_sa->vaddr);
237 } else
238 #endif
239 {
240 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
241 }
242
243 m_adj(mb, SDP_HEAD_SIZE);
244 SOCKBUF_LOCK(&sk->so_rcv);
245 if (unlikely(h->flags & SDP_OOB_PRES))
246 sdp_urg(ssk, mb);
247 sbappend_locked(&sk->so_rcv, mb, 0);
248 sorwakeup_locked(sk);
249 return mb;
250 }
251
252 static int
sdp_get_recv_bytes(struct sdp_sock * ssk,u32 new_size)253 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
254 {
255
256 return MIN(new_size, SDP_MAX_PACKET);
257 }
258
259 int
sdp_init_buffers(struct sdp_sock * ssk,u32 new_size)260 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
261 {
262
263 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
264 sdp_post_recvs(ssk);
265
266 return 0;
267 }
268
269 int
sdp_resize_buffers(struct sdp_sock * ssk,u32 new_size)270 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
271 {
272 u32 curr_size = ssk->recv_bytes;
273 u32 max_size = SDP_MAX_PACKET;
274
275 if (new_size > curr_size && new_size <= max_size) {
276 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
277 return 0;
278 }
279 return -1;
280 }
281
282 static void
sdp_handle_resize_request(struct sdp_sock * ssk,struct sdp_chrecvbuf * buf)283 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
284 {
285 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
286 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
287 else
288 ssk->recv_request_head = ring_tail(ssk->rx_ring);
289 ssk->recv_request = 1;
290 }
291
292 static void
sdp_handle_resize_ack(struct sdp_sock * ssk,struct sdp_chrecvbuf * buf)293 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
294 {
295 u32 new_size = ntohl(buf->size);
296
297 if (new_size > ssk->xmit_size_goal)
298 ssk->xmit_size_goal = new_size;
299 }
300
301 static struct mbuf *
sdp_recv_completion(struct sdp_sock * ssk,int id)302 sdp_recv_completion(struct sdp_sock *ssk, int id)
303 {
304 struct sdp_buf *rx_req;
305 struct ib_device *dev;
306 struct mbuf *mb;
307
308 if (unlikely(id != ring_tail(ssk->rx_ring))) {
309 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
310 id, ring_tail(ssk->rx_ring));
311 return NULL;
312 }
313
314 dev = ssk->ib_device;
315 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
316 mb = rx_req->mb;
317 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
318
319 atomic_inc(&ssk->rx_ring.tail);
320 atomic_dec(&ssk->remote_credits);
321 return mb;
322 }
323
324 static void
sdp_process_rx_ctl_mb(struct sdp_sock * ssk,struct mbuf * mb)325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326 {
327 struct sdp_bsdh *h;
328 struct socket *sk;
329
330 SDP_WLOCK_ASSERT(ssk);
331
332 sk = ssk->socket;
333 h = mtod(mb, struct sdp_bsdh *);
334 switch (h->mid) {
335 case SDP_MID_DATA:
336 case SDP_MID_SRCAVAIL:
337 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
338
339 /* got data in RCV_SHUTDOWN */
340 if (ssk->state == TCPS_FIN_WAIT_1) {
341 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
342 sdp_notify(ssk, ECONNRESET);
343 }
344
345 break;
346 #ifdef SDP_ZCOPY
347 case SDP_MID_RDMARDCOMPL:
348 break;
349 case SDP_MID_SENDSM:
350 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
351 break;
352 case SDP_MID_SRCAVAIL_CANCEL:
353 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
354 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
355 if (ssk->rx_sa) {
356 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
357 ssk->rx_sa->flags |= RX_SA_ABORTED;
358 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
359 the dirty logic from recvmsg */
360 } else {
361 sdp_dbg(sk, "Got SrcAvailCancel - "
362 "but no SrcAvail in process\n");
363 }
364 break;
365 case SDP_MID_SINKAVAIL:
366 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
367 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
368 /* FALLTHROUGH */
369 #endif
370 case SDP_MID_ABORT:
371 sdp_dbg_data(sk, "Handling ABORT\n");
372 sdp_prf(sk, NULL, "Handling ABORT");
373 sdp_notify(ssk, ECONNRESET);
374 break;
375 case SDP_MID_DISCONN:
376 sdp_dbg_data(sk, "Handling DISCONN\n");
377 sdp_prf(sk, NULL, "Handling DISCONN");
378 sdp_handle_disconn(ssk);
379 break;
380 case SDP_MID_CHRCVBUF:
381 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
382 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
383 break;
384 case SDP_MID_CHRCVBUF_ACK:
385 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
386 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
387 break;
388 default:
389 /* TODO: Handle other messages */
390 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
391 break;
392 }
393 m_freem(mb);
394 }
395
396 static int
sdp_process_rx_mb(struct sdp_sock * ssk,struct mbuf * mb)397 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
398 {
399 struct socket *sk;
400 struct sdp_bsdh *h;
401 unsigned long mseq_ack;
402 int credits_before;
403
404 h = mtod(mb, struct sdp_bsdh *);
405 sk = ssk->socket;
406 /*
407 * If another thread is in so_pcbfree this may be partially torn
408 * down but no further synchronization is required as the destroying
409 * thread will wait for receive to shutdown before discarding the
410 * socket.
411 */
412 if (sk == NULL) {
413 m_freem(mb);
414 return 0;
415 }
416
417 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
418
419 mseq_ack = ntohl(h->mseq_ack);
420 credits_before = tx_credits(ssk);
421 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
422 1 + ntohs(h->bufs));
423 if (mseq_ack >= ssk->nagle_last_unacked)
424 ssk->nagle_last_unacked = 0;
425
426 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
427 mid2str(h->mid), ntohs(h->bufs), credits_before,
428 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
429
430 if (unlikely(h->mid == SDP_MID_DATA &&
431 mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
432 /* Credit update is valid even after RCV_SHUTDOWN */
433 m_freem(mb);
434 return 0;
435 }
436
437 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
438 TCPS_HAVERCVDFIN(ssk->state)) {
439 sdp_prf(sk, NULL, "Control mb - queing to control queue");
440 #ifdef SDP_ZCOPY
441 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
442 sdp_dbg_data(sk, "Got SrcAvailCancel. "
443 "seq: 0x%d seq_ack: 0x%d\n",
444 ntohl(h->mseq), ntohl(h->mseq_ack));
445 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
446 }
447
448
449 if (h->mid == SDP_MID_RDMARDCOMPL) {
450 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
451 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
452 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
453 ntohl(rrch->len));
454 }
455 #endif
456 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
457 m_freem(mb);
458 return (0);
459 }
460
461 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
462 mb = sdp_sock_queue_rcv_mb(sk, mb);
463
464
465 return 0;
466 }
467
468 /* called only from irq */
469 static struct mbuf *
sdp_process_rx_wc(struct sdp_sock * ssk,struct ib_wc * wc)470 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
471 {
472 struct mbuf *mb;
473 struct sdp_bsdh *h;
474 struct socket *sk = ssk->socket;
475 int mseq;
476
477 mb = sdp_recv_completion(ssk, wc->wr_id);
478 if (unlikely(!mb))
479 return NULL;
480
481 if (unlikely(wc->status)) {
482 if (ssk->qp_active && sk) {
483 sdp_dbg(sk, "Recv completion with error. "
484 "Status %s (%d), vendor: %d\n",
485 ib_wc_status_msg(wc->status), wc->status,
486 wc->vendor_err);
487 sdp_abort(sk);
488 ssk->qp_active = 0;
489 }
490 m_freem(mb);
491 return NULL;
492 }
493
494 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
495 (int)wc->wr_id, wc->byte_len);
496 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
497 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
498 wc->byte_len, sizeof(struct sdp_bsdh));
499 m_freem(mb);
500 return NULL;
501 }
502 /* Use m_adj to trim the tail of data we didn't use. */
503 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
504 h = mtod(mb, struct sdp_bsdh *);
505
506 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
507
508 ssk->rx_packets++;
509 ssk->rx_bytes += mb->m_pkthdr.len;
510
511 mseq = ntohl(h->mseq);
512 atomic_set(&ssk->mseq_ack, mseq);
513 if (mseq != (int)wc->wr_id)
514 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
515 mseq, (int)wc->wr_id);
516
517 return mb;
518 }
519
520 /* Wakeup writers if we now have credits. */
521 static void
sdp_bzcopy_write_space(struct sdp_sock * ssk)522 sdp_bzcopy_write_space(struct sdp_sock *ssk)
523 {
524 struct socket *sk = ssk->socket;
525
526 if (tx_credits(ssk) >= ssk->min_bufs && sk)
527 sowwakeup(sk);
528 }
529
530 /* only from interrupt. */
531 static int
sdp_poll_rx_cq(struct sdp_sock * ssk)532 sdp_poll_rx_cq(struct sdp_sock *ssk)
533 {
534 struct ib_cq *cq = ssk->rx_ring.cq;
535 struct ib_wc ibwc[SDP_NUM_WC];
536 int n, i;
537 int wc_processed = 0;
538 struct mbuf *mb;
539
540 do {
541 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
542 for (i = 0; i < n; ++i) {
543 struct ib_wc *wc = &ibwc[i];
544
545 BUG_ON(!(wc->wr_id & SDP_OP_RECV));
546 mb = sdp_process_rx_wc(ssk, wc);
547 if (!mb)
548 continue;
549
550 sdp_process_rx_mb(ssk, mb);
551 wc_processed++;
552 }
553 } while (n == SDP_NUM_WC);
554
555 if (wc_processed)
556 sdp_bzcopy_write_space(ssk);
557
558 return wc_processed;
559 }
560
561 static void
sdp_rx_comp_work(struct work_struct * work)562 sdp_rx_comp_work(struct work_struct *work)
563 {
564 struct sdp_sock *ssk = container_of(work, struct sdp_sock,
565 rx_comp_work);
566
567 sdp_prf(ssk->socket, NULL, "%s", __func__);
568
569 SDP_WLOCK(ssk);
570 if (unlikely(!ssk->qp)) {
571 sdp_prf(ssk->socket, NULL, "qp was destroyed");
572 goto out;
573 }
574 if (unlikely(!ssk->rx_ring.cq)) {
575 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
576 goto out;
577 }
578
579 if (unlikely(!ssk->poll_cq)) {
580 struct rdma_cm_id *id = ssk->id;
581 if (id && id->qp)
582 rdma_notify(id, IB_EVENT_COMM_EST);
583 goto out;
584 }
585
586 sdp_do_posts(ssk);
587 out:
588 SDP_WUNLOCK(ssk);
589 }
590
591 void
sdp_do_posts(struct sdp_sock * ssk)592 sdp_do_posts(struct sdp_sock *ssk)
593 {
594 struct socket *sk = ssk->socket;
595 int xmit_poll_force;
596 struct mbuf *mb;
597
598 SDP_WLOCK_ASSERT(ssk);
599 if (!ssk->qp_active) {
600 sdp_dbg(sk, "QP is deactivated\n");
601 return;
602 }
603
604 while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
605 sdp_process_rx_ctl_mb(ssk, mb);
606
607 if (ssk->state == TCPS_TIME_WAIT)
608 return;
609
610 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
611 return;
612
613 sdp_post_recvs(ssk);
614
615 if (tx_ring_posted(ssk))
616 sdp_xmit_poll(ssk, 1);
617
618 sdp_post_sends(ssk, M_NOWAIT);
619
620 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
621
622 if (credit_update_needed(ssk) || xmit_poll_force) {
623 /* if has pending tx because run out of tx_credits - xmit it */
624 sdp_prf(sk, NULL, "Processing to free pending sends");
625 sdp_xmit_poll(ssk, xmit_poll_force);
626 sdp_prf(sk, NULL, "Sending credit update");
627 sdp_post_sends(ssk, M_NOWAIT);
628 }
629
630 }
631
632 int
sdp_process_rx(struct sdp_sock * ssk)633 sdp_process_rx(struct sdp_sock *ssk)
634 {
635 int wc_processed = 0;
636 int credits_before;
637
638 if (!rx_ring_trylock(&ssk->rx_ring)) {
639 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
640 return 0;
641 }
642
643 credits_before = tx_credits(ssk);
644
645 wc_processed = sdp_poll_rx_cq(ssk);
646 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
647
648 if (wc_processed) {
649 sdp_prf(ssk->socket, NULL, "credits: %d -> %d",
650 credits_before, tx_credits(ssk));
651 queue_work(rx_comp_wq, &ssk->rx_comp_work);
652 }
653 sdp_arm_rx_cq(ssk);
654
655 rx_ring_unlock(&ssk->rx_ring);
656
657 return (wc_processed);
658 }
659
660 static void
sdp_rx_irq(struct ib_cq * cq,void * cq_context)661 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
662 {
663 struct sdp_sock *ssk;
664
665 ssk = cq_context;
666 KASSERT(cq == ssk->rx_ring.cq,
667 ("%s: mismatched cq on %p", __func__, ssk));
668
669 SDPSTATS_COUNTER_INC(rx_int_count);
670
671 sdp_prf(sk, NULL, "rx irq");
672
673 sdp_process_rx(ssk);
674 }
675
676 static
sdp_rx_ring_purge(struct sdp_sock * ssk)677 void sdp_rx_ring_purge(struct sdp_sock *ssk)
678 {
679 while (rx_ring_posted(ssk) > 0) {
680 struct mbuf *mb;
681 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
682 if (!mb)
683 break;
684 m_freem(mb);
685 }
686 }
687
688 void
sdp_rx_ring_init(struct sdp_sock * ssk)689 sdp_rx_ring_init(struct sdp_sock *ssk)
690 {
691 ssk->rx_ring.buffer = NULL;
692 ssk->rx_ring.destroyed = 0;
693 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
694 }
695
696 static void
sdp_rx_cq_event_handler(struct ib_event * event,void * data)697 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
698 {
699 }
700
701 int
sdp_rx_ring_create(struct sdp_sock * ssk,struct ib_device * device)702 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
703 {
704 struct ib_cq_init_attr rx_cq_attr = {
705 .cqe = SDP_RX_SIZE,
706 .comp_vector = 0,
707 .flags = 0,
708 };
709 struct ib_cq *rx_cq;
710 int rc = 0;
711
712 sdp_dbg(ssk->socket, "rx ring created");
713 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
714 atomic_set(&ssk->rx_ring.head, 1);
715 atomic_set(&ssk->rx_ring.tail, 1);
716
717 ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE,
718 M_SDP, M_WAITOK);
719
720 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
721 ssk, &rx_cq_attr);
722 if (IS_ERR(rx_cq)) {
723 rc = PTR_ERR(rx_cq);
724 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
725 goto err_cq;
726 }
727
728 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
729 sdp_arm_rx_cq(ssk);
730
731 return 0;
732
733 err_cq:
734 free(ssk->rx_ring.buffer, M_SDP);
735 ssk->rx_ring.buffer = NULL;
736 return rc;
737 }
738
739 void
sdp_rx_ring_destroy(struct sdp_sock * ssk)740 sdp_rx_ring_destroy(struct sdp_sock *ssk)
741 {
742
743 cancel_work_sync(&ssk->rx_comp_work);
744 rx_ring_destroy_lock(&ssk->rx_ring);
745
746 if (ssk->rx_ring.buffer) {
747 sdp_rx_ring_purge(ssk);
748 free(ssk->rx_ring.buffer, M_SDP);
749 ssk->rx_ring.buffer = NULL;
750 }
751
752 if (ssk->rx_ring.cq) {
753 ib_destroy_cq(ssk->rx_ring.cq);
754 ssk->rx_ring.cq = NULL;
755 }
756
757 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
758 }
759