xref: /freebsd/sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c (revision 361e428888e630eb708c72cf31579a25ba5d4f03)
1 /*
2  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "sdp.h"
33 
34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35 		"Receive buffer initial size in bytes.");
36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37 		"Receive buffer size scale factor.");
38 
39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
40 static void
41 sdp_handle_disconn(struct sdp_sock *ssk)
42 {
43 
44 	sdp_dbg(ssk->socket, "%s\n", __func__);
45 
46 	SDP_WLOCK_ASSERT(ssk);
47 	if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48 		socantrcvmore(ssk->socket);
49 
50 	switch (ssk->state) {
51 	case TCPS_SYN_RECEIVED:
52 	case TCPS_ESTABLISHED:
53 		ssk->state = TCPS_CLOSE_WAIT;
54 		break;
55 
56 	case TCPS_FIN_WAIT_1:
57 		/* Received a reply FIN - start Infiniband tear down */
58 		sdp_dbg(ssk->socket,
59 		    "%s: Starting Infiniband tear down sending DREQ\n",
60 		    __func__);
61 
62 		sdp_cancel_dreq_wait_timeout(ssk);
63 		ssk->qp_active = 0;
64 		if (ssk->id) {
65 			struct rdma_cm_id *id;
66 
67 			id = ssk->id;
68 			SDP_WUNLOCK(ssk);
69 			rdma_disconnect(id);
70 			SDP_WLOCK(ssk);
71 		} else {
72 			sdp_warn(ssk->socket,
73 			    "%s: ssk->id is NULL\n", __func__);
74 			return;
75 		}
76 		break;
77 	case TCPS_TIME_WAIT:
78 		/* This is a mutual close situation and we've got the DREQ from
79 		   the peer before the SDP_MID_DISCONNECT */
80 		break;
81 	case TCPS_CLOSED:
82 		/* FIN arrived after IB teardown started - do nothing */
83 		sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84 		    __func__, sdp_state_str(ssk->state));
85 		return;
86 	default:
87 		sdp_warn(ssk->socket,
88 		    "%s: FIN in unexpected state. state=%d\n",
89 		    __func__, ssk->state);
90 		break;
91 	}
92 }
93 
94 static int
95 sdp_post_recv(struct sdp_sock *ssk)
96 {
97 	struct sdp_buf *rx_req;
98 	int i, rc;
99 	u64 addr;
100 	struct ib_device *dev;
101 	struct ib_recv_wr rx_wr = { NULL };
102 	struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103 	struct ib_sge *sge = ibsge;
104 	struct ib_recv_wr *bad_wr;
105 	struct mbuf *mb, *m;
106 	struct sdp_bsdh *h;
107 	int id = ring_head(ssk->rx_ring);
108 
109 	/* Now, allocate and repost recv */
110 	sdp_prf(ssk->socket, mb, "Posting mb");
111 	mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
112 	if (mb == NULL) {
113 		/* Retry so we can't stall out with no memory. */
114 		if (!rx_ring_posted(ssk))
115 			queue_work(rx_comp_wq, &ssk->rx_comp_work);
116 		return -1;
117 	}
118 	for (m = mb; m != NULL; m = m->m_next) {
119 		m->m_len = M_SIZE(m);
120 		mb->m_pkthdr.len += m->m_len;
121 	}
122 	h = mtod(mb, struct sdp_bsdh *);
123 	rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
124 	rx_req->mb = mb;
125 	dev = ssk->ib_device;
126         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
127 		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
128 		    DMA_TO_DEVICE);
129 		/* TODO: proper error handling */
130 		BUG_ON(ib_dma_mapping_error(dev, addr));
131 		BUG_ON(i >= SDP_MAX_RECV_SGES);
132 		rx_req->mapping[i] = addr;
133 		sge->addr = addr;
134 		sge->length = mb->m_len;
135 		sge->lkey = ssk->sdp_dev->mr->lkey;
136         }
137 
138 	rx_wr.next = NULL;
139 	rx_wr.wr_id = id | SDP_OP_RECV;
140 	rx_wr.sg_list = ibsge;
141 	rx_wr.num_sge = i;
142 	rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
143 	if (unlikely(rc)) {
144 		sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
145 
146 		sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
147 		m_freem(mb);
148 
149 		sdp_notify(ssk, ECONNRESET);
150 
151 		return -1;
152 	}
153 
154 	atomic_inc(&ssk->rx_ring.head);
155 	SDPSTATS_COUNTER_INC(post_recv);
156 
157 	return 0;
158 }
159 
160 static inline int
161 sdp_post_recvs_needed(struct sdp_sock *ssk)
162 {
163 	unsigned long bytes_in_process;
164 	unsigned long max_bytes;
165 	int buffer_size;
166 	int posted;
167 
168 	if (!ssk->qp_active || !ssk->socket)
169 		return 0;
170 
171 	posted = rx_ring_posted(ssk);
172 	if (posted >= SDP_RX_SIZE)
173 		return 0;
174 	if (posted < SDP_MIN_TX_CREDITS)
175 		return 1;
176 
177 	buffer_size = ssk->recv_bytes;
178 	max_bytes = max(ssk->socket->so_snd.sb_hiwat,
179 	    (1 + SDP_MIN_TX_CREDITS) * buffer_size);
180 	max_bytes *= rcvbuf_scale;
181 	/*
182 	 * Compute bytes in the receive queue and socket buffer.
183 	 */
184 	bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
185 	bytes_in_process += sbused(&ssk->socket->so_rcv);
186 
187 	return bytes_in_process < max_bytes;
188 }
189 
190 static inline void
191 sdp_post_recvs(struct sdp_sock *ssk)
192 {
193 
194 	while (sdp_post_recvs_needed(ssk))
195 		if (sdp_post_recv(ssk))
196 			return;
197 }
198 
199 static inline struct mbuf *
200 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
201 {
202 	struct sdp_sock *ssk = sdp_sk(sk);
203 	struct sdp_bsdh *h;
204 
205 	h = mtod(mb, struct sdp_bsdh *);
206 
207 #ifdef SDP_ZCOPY
208 	SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
209 	if (h->mid == SDP_MID_SRCAVAIL) {
210 		struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
211 		struct rx_srcavail_state *rx_sa;
212 
213 		ssk->srcavail_cancel_mseq = 0;
214 
215 		ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
216 				sizeof(struct rx_srcavail_state), M_NOWAIT);
217 
218 		rx_sa->mseq = ntohl(h->mseq);
219 		rx_sa->used = 0;
220 		rx_sa->len = mb_len = ntohl(srcah->len);
221 		rx_sa->rkey = ntohl(srcah->rkey);
222 		rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
223 		rx_sa->flags = 0;
224 
225 		if (ssk->tx_sa) {
226 			sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
227 					"for TX SrcAvail. waking up TX SrcAvail"
228 					"to be aborted\n");
229 			wake_up(sk->sk_sleep);
230 		}
231 
232 		atomic_add(mb->len, &ssk->rcv_nxt);
233 		sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
234 			mb_len, rx_sa->vaddr);
235 	} else
236 #endif
237 	{
238 		atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
239 	}
240 
241 	m_adj(mb, SDP_HEAD_SIZE);
242 	SOCKBUF_LOCK(&sk->so_rcv);
243 	if (unlikely(h->flags & SDP_OOB_PRES))
244 		sdp_urg(ssk, mb);
245 	sbappend_locked(&sk->so_rcv, mb, 0);
246 	sorwakeup_locked(sk);
247 	return mb;
248 }
249 
250 static int
251 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
252 {
253 
254 	return MIN(new_size, SDP_MAX_PACKET);
255 }
256 
257 int
258 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
259 {
260 
261 	ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
262 	sdp_post_recvs(ssk);
263 
264 	return 0;
265 }
266 
267 int
268 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
269 {
270 	u32 curr_size = ssk->recv_bytes;
271 	u32 max_size = SDP_MAX_PACKET;
272 
273 	if (new_size > curr_size && new_size <= max_size) {
274 		ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
275 		return 0;
276 	}
277 	return -1;
278 }
279 
280 static void
281 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
282 {
283 	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
284 		ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
285 	else
286 		ssk->recv_request_head = ring_tail(ssk->rx_ring);
287 	ssk->recv_request = 1;
288 }
289 
290 static void
291 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
292 {
293 	u32 new_size = ntohl(buf->size);
294 
295 	if (new_size > ssk->xmit_size_goal)
296 		ssk->xmit_size_goal = new_size;
297 }
298 
299 static struct mbuf *
300 sdp_recv_completion(struct sdp_sock *ssk, int id)
301 {
302 	struct sdp_buf *rx_req;
303 	struct ib_device *dev;
304 	struct mbuf *mb;
305 
306 	if (unlikely(id != ring_tail(ssk->rx_ring))) {
307 		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
308 			id, ring_tail(ssk->rx_ring));
309 		return NULL;
310 	}
311 
312 	dev = ssk->ib_device;
313 	rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
314 	mb = rx_req->mb;
315 	sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
316 
317 	atomic_inc(&ssk->rx_ring.tail);
318 	atomic_dec(&ssk->remote_credits);
319 	return mb;
320 }
321 
322 /* socket lock should be taken before calling this */
323 static int
324 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
325 {
326 	struct sdp_bsdh *h;
327 	struct socket *sk;
328 
329 	SDP_WLOCK_ASSERT(ssk);
330 	sk = ssk->socket;
331  	h = mtod(mb, struct sdp_bsdh *);
332 	switch (h->mid) {
333 	case SDP_MID_DATA:
334 	case SDP_MID_SRCAVAIL:
335 		sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
336 
337 		/* got data in RCV_SHUTDOWN */
338 		if (ssk->state == TCPS_FIN_WAIT_1) {
339 			sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
340 			sdp_notify(ssk, ECONNRESET);
341 		}
342 		m_freem(mb);
343 
344 		break;
345 #ifdef SDP_ZCOPY
346 	case SDP_MID_RDMARDCOMPL:
347 		m_freem(mb);
348 		break;
349 	case SDP_MID_SENDSM:
350 		sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
351 		m_freem(mb);
352 		break;
353 	case SDP_MID_SRCAVAIL_CANCEL:
354 		sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
355 		sdp_prf(sk, NULL, "Handling SrcAvailCancel");
356 		if (ssk->rx_sa) {
357 			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
358 			ssk->rx_sa->flags |= RX_SA_ABORTED;
359 			ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
360 			                      the dirty logic from recvmsg */
361 		} else {
362 			sdp_dbg(sk, "Got SrcAvailCancel - "
363 					"but no SrcAvail in process\n");
364 		}
365 		m_freem(mb);
366 		break;
367 	case SDP_MID_SINKAVAIL:
368 		sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
369 		sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
370 		/* FALLTHROUGH */
371 #endif
372 	case SDP_MID_ABORT:
373 		sdp_dbg_data(sk, "Handling ABORT\n");
374 		sdp_prf(sk, NULL, "Handling ABORT");
375 		sdp_notify(ssk, ECONNRESET);
376 		m_freem(mb);
377 		break;
378 	case SDP_MID_DISCONN:
379 		sdp_dbg_data(sk, "Handling DISCONN\n");
380 		sdp_prf(sk, NULL, "Handling DISCONN");
381 		sdp_handle_disconn(ssk);
382 		break;
383 	case SDP_MID_CHRCVBUF:
384 		sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
385 		sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
386 		m_freem(mb);
387 		break;
388 	case SDP_MID_CHRCVBUF_ACK:
389 		sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
390 		sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
391 		m_freem(mb);
392 		break;
393 	default:
394 		/* TODO: Handle other messages */
395 		sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
396 		m_freem(mb);
397 	}
398 
399 	return 0;
400 }
401 
402 static int
403 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
404 {
405 	struct socket *sk;
406 	struct sdp_bsdh *h;
407 	unsigned long mseq_ack;
408 	int credits_before;
409 
410 	h = mtod(mb, struct sdp_bsdh *);
411 	sk = ssk->socket;
412 	/*
413 	 * If another thread is in so_pcbfree this may be partially torn
414 	 * down but no further synchronization is required as the destroying
415 	 * thread will wait for receive to shutdown before discarding the
416 	 * socket.
417 	 */
418 	if (sk == NULL) {
419 		m_freem(mb);
420 		return 0;
421 	}
422 
423 	SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
424 
425 	mseq_ack = ntohl(h->mseq_ack);
426 	credits_before = tx_credits(ssk);
427 	atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
428 			1 + ntohs(h->bufs));
429 	if (mseq_ack >= ssk->nagle_last_unacked)
430 		ssk->nagle_last_unacked = 0;
431 
432 	sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
433 		mid2str(h->mid), ntohs(h->bufs), credits_before,
434 		tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
435 
436 	if (unlikely(h->mid == SDP_MID_DATA &&
437 	    mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
438 		/* Credit update is valid even after RCV_SHUTDOWN */
439 		m_freem(mb);
440 		return 0;
441 	}
442 
443 	if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
444 	    TCPS_HAVERCVDFIN(ssk->state)) {
445 		sdp_prf(sk, NULL, "Control mb - queing to control queue");
446 #ifdef SDP_ZCOPY
447 		if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
448 			sdp_dbg_data(sk, "Got SrcAvailCancel. "
449 					"seq: 0x%d seq_ack: 0x%d\n",
450 					ntohl(h->mseq), ntohl(h->mseq_ack));
451 			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
452 		}
453 
454 
455 		if (h->mid == SDP_MID_RDMARDCOMPL) {
456 			struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
457 			sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
458 			sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
459 					ntohl(rrch->len));
460 		}
461 #endif
462 		mb->m_nextpkt = NULL;
463 		if (ssk->rx_ctl_tail)
464 			ssk->rx_ctl_tail->m_nextpkt = mb;
465 		else
466 			ssk->rx_ctl_q = mb;
467 		ssk->rx_ctl_tail = mb;
468 
469 		return 0;
470 	}
471 
472 	sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
473 	mb = sdp_sock_queue_rcv_mb(sk, mb);
474 
475 
476 	return 0;
477 }
478 
479 /* called only from irq */
480 static struct mbuf *
481 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
482 {
483 	struct mbuf *mb;
484 	struct sdp_bsdh *h;
485 	struct socket *sk = ssk->socket;
486 	int mseq;
487 
488 	mb = sdp_recv_completion(ssk, wc->wr_id);
489 	if (unlikely(!mb))
490 		return NULL;
491 
492 	if (unlikely(wc->status)) {
493 		if (ssk->qp_active && sk) {
494 			sdp_dbg(sk, "Recv completion with error. "
495 					"Status %d, vendor: %d\n",
496 				wc->status, wc->vendor_err);
497 			sdp_abort(sk);
498 			ssk->qp_active = 0;
499 		}
500 		m_freem(mb);
501 		return NULL;
502 	}
503 
504 	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
505 			(int)wc->wr_id, wc->byte_len);
506 	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
507 		sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
508 				wc->byte_len, sizeof(struct sdp_bsdh));
509 		m_freem(mb);
510 		return NULL;
511 	}
512 	/* Use m_adj to trim the tail of data we didn't use. */
513 	m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
514 	h = mtod(mb, struct sdp_bsdh *);
515 
516 	SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
517 
518 	ssk->rx_packets++;
519 	ssk->rx_bytes += mb->m_pkthdr.len;
520 
521 	mseq = ntohl(h->mseq);
522 	atomic_set(&ssk->mseq_ack, mseq);
523 	if (mseq != (int)wc->wr_id)
524 		sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
525 				mseq, (int)wc->wr_id);
526 
527 	return mb;
528 }
529 
530 /* Wakeup writers if we now have credits. */
531 static void
532 sdp_bzcopy_write_space(struct sdp_sock *ssk)
533 {
534 	struct socket *sk = ssk->socket;
535 
536 	if (tx_credits(ssk) >= ssk->min_bufs && sk)
537 		sowwakeup(sk);
538 }
539 
540 /* only from interrupt. */
541 static int
542 sdp_poll_rx_cq(struct sdp_sock *ssk)
543 {
544 	struct ib_cq *cq = ssk->rx_ring.cq;
545 	struct ib_wc ibwc[SDP_NUM_WC];
546 	int n, i;
547 	int wc_processed = 0;
548 	struct mbuf *mb;
549 
550 	do {
551 		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
552 		for (i = 0; i < n; ++i) {
553 			struct ib_wc *wc = &ibwc[i];
554 
555 			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
556 			mb = sdp_process_rx_wc(ssk, wc);
557 			if (!mb)
558 				continue;
559 
560 			sdp_process_rx_mb(ssk, mb);
561 			wc_processed++;
562 		}
563 	} while (n == SDP_NUM_WC);
564 
565 	if (wc_processed)
566 		sdp_bzcopy_write_space(ssk);
567 
568 	return wc_processed;
569 }
570 
571 static void
572 sdp_rx_comp_work(struct work_struct *work)
573 {
574 	struct sdp_sock *ssk = container_of(work, struct sdp_sock,
575 			rx_comp_work);
576 
577 	sdp_prf(ssk->socket, NULL, "%s", __func__);
578 
579 	SDP_WLOCK(ssk);
580 	if (unlikely(!ssk->qp)) {
581 		sdp_prf(ssk->socket, NULL, "qp was destroyed");
582 		goto out;
583 	}
584 	if (unlikely(!ssk->rx_ring.cq)) {
585 		sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
586 		goto out;
587 	}
588 
589 	if (unlikely(!ssk->poll_cq)) {
590 		struct rdma_cm_id *id = ssk->id;
591 		if (id && id->qp)
592 			rdma_notify(id, IB_EVENT_COMM_EST);
593 		goto out;
594 	}
595 
596 	sdp_do_posts(ssk);
597 out:
598 	SDP_WUNLOCK(ssk);
599 }
600 
601 void
602 sdp_do_posts(struct sdp_sock *ssk)
603 {
604 	struct socket *sk = ssk->socket;
605 	int xmit_poll_force;
606 	struct mbuf *mb;
607 
608 	SDP_WLOCK_ASSERT(ssk);
609 	if (!ssk->qp_active) {
610 		sdp_dbg(sk, "QP is deactivated\n");
611 		return;
612 	}
613 
614 	while ((mb = ssk->rx_ctl_q)) {
615 		ssk->rx_ctl_q = mb->m_nextpkt;
616 		mb->m_nextpkt = NULL;
617 		sdp_process_rx_ctl_mb(ssk, mb);
618 	}
619 
620 	if (ssk->state == TCPS_TIME_WAIT)
621 		return;
622 
623 	if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
624 		return;
625 
626 	sdp_post_recvs(ssk);
627 
628 	if (tx_ring_posted(ssk))
629 		sdp_xmit_poll(ssk, 1);
630 
631 	sdp_post_sends(ssk, M_NOWAIT);
632 
633 	xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
634 
635 	if (credit_update_needed(ssk) || xmit_poll_force) {
636 		/* if has pending tx because run out of tx_credits - xmit it */
637 		sdp_prf(sk, NULL, "Processing to free pending sends");
638 		sdp_xmit_poll(ssk,  xmit_poll_force);
639 		sdp_prf(sk, NULL, "Sending credit update");
640 		sdp_post_sends(ssk, M_NOWAIT);
641 	}
642 
643 }
644 
645 int
646 sdp_process_rx(struct sdp_sock *ssk)
647 {
648 	int wc_processed = 0;
649 	int credits_before;
650 
651 	if (!rx_ring_trylock(&ssk->rx_ring)) {
652 		sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
653 		return 0;
654 	}
655 
656 	credits_before = tx_credits(ssk);
657 
658 	wc_processed = sdp_poll_rx_cq(ssk);
659 	sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
660 
661 	if (wc_processed) {
662 		sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
663 				credits_before, tx_credits(ssk));
664 		queue_work(rx_comp_wq, &ssk->rx_comp_work);
665 	}
666 	sdp_arm_rx_cq(ssk);
667 
668 	rx_ring_unlock(&ssk->rx_ring);
669 
670 	return (wc_processed);
671 }
672 
673 static void
674 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
675 {
676 	struct socket *sk = cq_context;
677 	struct sdp_sock *ssk = sdp_sk(sk);
678 
679 	if (cq != ssk->rx_ring.cq) {
680 		sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
681 		return;
682 	}
683 
684 	SDPSTATS_COUNTER_INC(rx_int_count);
685 
686 	sdp_prf(sk, NULL, "rx irq");
687 
688 	sdp_process_rx(ssk);
689 }
690 
691 static
692 void sdp_rx_ring_purge(struct sdp_sock *ssk)
693 {
694 	while (rx_ring_posted(ssk) > 0) {
695 		struct mbuf *mb;
696 		mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
697 		if (!mb)
698 			break;
699 		m_freem(mb);
700 	}
701 }
702 
703 void
704 sdp_rx_ring_init(struct sdp_sock *ssk)
705 {
706 	ssk->rx_ring.buffer = NULL;
707 	ssk->rx_ring.destroyed = 0;
708 	rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
709 }
710 
711 static void
712 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
713 {
714 }
715 
716 int
717 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
718 {
719 	struct ib_cq *rx_cq;
720 	int rc = 0;
721 
722 
723 	sdp_dbg(ssk->socket, "rx ring created");
724 	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
725 	atomic_set(&ssk->rx_ring.head, 1);
726 	atomic_set(&ssk->rx_ring.tail, 1);
727 
728 	ssk->rx_ring.buffer = kmalloc(
729 			sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
730 	if (!ssk->rx_ring.buffer) {
731 		sdp_warn(ssk->socket,
732 			"Unable to allocate RX Ring size %zd.\n",
733 			 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
734 
735 		return -ENOMEM;
736 	}
737 
738 	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
739 			  ssk->socket, SDP_RX_SIZE, 0);
740 
741 	if (IS_ERR(rx_cq)) {
742 		rc = PTR_ERR(rx_cq);
743 		sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
744 		goto err_cq;
745 	}
746 
747 	sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
748 	sdp_arm_rx_cq(ssk);
749 
750 	return 0;
751 
752 err_cq:
753 	kfree(ssk->rx_ring.buffer);
754 	ssk->rx_ring.buffer = NULL;
755 	return rc;
756 }
757 
758 void
759 sdp_rx_ring_destroy(struct sdp_sock *ssk)
760 {
761 
762 	cancel_work_sync(&ssk->rx_comp_work);
763 	rx_ring_destroy_lock(&ssk->rx_ring);
764 
765 	if (ssk->rx_ring.buffer) {
766 		sdp_rx_ring_purge(ssk);
767 
768 		kfree(ssk->rx_ring.buffer);
769 		ssk->rx_ring.buffer = NULL;
770 	}
771 
772 	if (ssk->rx_ring.cq) {
773 		if (ib_destroy_cq(ssk->rx_ring.cq)) {
774 			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
775 				ssk->rx_ring.cq);
776 		} else {
777 			ssk->rx_ring.cq = NULL;
778 		}
779 	}
780 
781 	WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
782 }
783