xref: /freebsd/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c (revision 63d1fd5970ec814904aa0f4580b10a0d302d08b2)
1 /*
2  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "sdp.h"
33 
34 #define sdp_cnt(var) do { (var)++; } while (0)
35 
36 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
37 		"Total number of keepalive probes sent.");
38 
39 static int sdp_process_tx_cq(struct sdp_sock *ssk);
40 static void sdp_poll_tx_timeout(void *data);
41 
42 int
43 sdp_xmit_poll(struct sdp_sock *ssk, int force)
44 {
45 	int wc_processed = 0;
46 
47 	SDP_WLOCK_ASSERT(ssk);
48 	sdp_prf(ssk->socket, NULL, "%s", __func__);
49 
50 	/* If we don't have a pending timer, set one up to catch our recent
51 	   post in case the interface becomes idle */
52 	if (!callout_pending(&ssk->tx_ring.timer))
53 		callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
54 		    sdp_poll_tx_timeout, ssk);
55 
56 	/* Poll the CQ every SDP_TX_POLL_MODER packets */
57 	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
58 		wc_processed = sdp_process_tx_cq(ssk);
59 
60 	return wc_processed;
61 }
62 
63 void
64 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
65 {
66 	struct sdp_buf *tx_req;
67 	struct sdp_bsdh *h;
68 	unsigned long mseq;
69 	struct ib_device *dev;
70 	struct ib_send_wr *bad_wr;
71 	struct ib_sge ibsge[SDP_MAX_SEND_SGES];
72 	struct ib_sge *sge;
73 	struct ib_send_wr tx_wr = { NULL };
74 	int i, rc;
75 	u64 addr;
76 
77 	SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
78 	SDPSTATS_HIST(send_size, mb->len);
79 
80 	if (!ssk->qp_active) {
81 		m_freem(mb);
82 		return;
83 	}
84 
85 	mseq = ring_head(ssk->tx_ring);
86 	h = mtod(mb, struct sdp_bsdh *);
87 	ssk->tx_packets++;
88 	ssk->tx_bytes += mb->m_pkthdr.len;
89 
90 #ifdef SDP_ZCOPY
91 	if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
92 		struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
93 		if (ssk->tx_sa != tx_sa) {
94 			sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
95 					"before being sent!\n");
96 			WARN_ON(1);
97 			m_freem(mb);
98 			return;
99 		}
100 		TX_SRCAVAIL_STATE(mb)->mseq = mseq;
101 	}
102 #endif
103 
104 	if (unlikely(mb->m_flags & M_URG))
105 		h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
106 	else
107 		h->flags = 0;
108 
109 	mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
110 	h->bufs = htons(rx_ring_posted(ssk));
111 	h->len = htonl(mb->m_pkthdr.len);
112 	h->mseq = htonl(mseq);
113 	h->mseq_ack = htonl(mseq_ack(ssk));
114 
115 	sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
116 			mid2str(h->mid), rx_ring_posted(ssk), mseq,
117 			ntohl(h->mseq_ack));
118 
119 	SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
120 
121 	tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
122 	tx_req->mb = mb;
123 	dev = ssk->ib_device;
124 	sge = &ibsge[0];
125 	for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
126 		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
127 		    DMA_TO_DEVICE);
128 		/* TODO: proper error handling */
129 		BUG_ON(ib_dma_mapping_error(dev, addr));
130 		BUG_ON(i >= SDP_MAX_SEND_SGES);
131 		tx_req->mapping[i] = addr;
132 		sge->addr = addr;
133 		sge->length = mb->m_len;
134 		sge->lkey = ssk->sdp_dev->mr->lkey;
135 	}
136 	tx_wr.next = NULL;
137 	tx_wr.wr_id = mseq | SDP_OP_SEND;
138 	tx_wr.sg_list = ibsge;
139 	tx_wr.num_sge = i;
140 	tx_wr.opcode = IB_WR_SEND;
141 	tx_wr.send_flags = IB_SEND_SIGNALED;
142 	if (unlikely(tx_req->mb->m_flags & M_URG))
143 		tx_wr.send_flags |= IB_SEND_SOLICITED;
144 
145 	rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
146 	if (unlikely(rc)) {
147 		sdp_dbg(ssk->socket,
148 				"ib_post_send failed with status %d.\n", rc);
149 
150 		sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
151 
152 		sdp_notify(ssk, ECONNRESET);
153 		m_freem(tx_req->mb);
154 		return;
155 	}
156 
157 	atomic_inc(&ssk->tx_ring.head);
158 	atomic_dec(&ssk->tx_ring.credits);
159 	atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
160 
161 	return;
162 }
163 
164 static struct mbuf *
165 sdp_send_completion(struct sdp_sock *ssk, int mseq)
166 {
167 	struct ib_device *dev;
168 	struct sdp_buf *tx_req;
169 	struct mbuf *mb = NULL;
170 	struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
171 
172 	if (unlikely(mseq != ring_tail(*tx_ring))) {
173 		printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
174 			mseq, ring_tail(*tx_ring));
175 		goto out;
176 	}
177 
178 	dev = ssk->ib_device;
179 	tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
180 	mb = tx_req->mb;
181 	sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
182 
183 #ifdef SDP_ZCOPY
184 	/* TODO: AIO and real zcopy code; add their context support here */
185 	if (BZCOPY_STATE(mb))
186 		BZCOPY_STATE(mb)->busy--;
187 #endif
188 
189 	atomic_inc(&tx_ring->tail);
190 
191 out:
192 	return mb;
193 }
194 
195 static int
196 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
197 {
198 	struct mbuf *mb = NULL;
199 	struct sdp_bsdh *h;
200 
201 	if (unlikely(wc->status)) {
202 		if (wc->status != IB_WC_WR_FLUSH_ERR) {
203 			sdp_prf(ssk->socket, mb, "Send completion with error. "
204 				"Status %d", wc->status);
205 			sdp_dbg_data(ssk->socket, "Send completion with error. "
206 				"Status %d\n", wc->status);
207 			sdp_notify(ssk, ECONNRESET);
208 		}
209 	}
210 
211 	mb = sdp_send_completion(ssk, wc->wr_id);
212 	if (unlikely(!mb))
213 		return -1;
214 
215 	h = mtod(mb, struct sdp_bsdh *);
216 	sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
217 	sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
218 	    mb, mb->m_pkthdr.len, ntohl(h->mseq));
219 	m_freem(mb);
220 
221 	return 0;
222 }
223 
224 static inline void
225 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
226 {
227 
228 	if (likely(wc->wr_id & SDP_OP_SEND)) {
229 		sdp_handle_send_comp(ssk, wc);
230 		return;
231 	}
232 
233 #ifdef SDP_ZCOPY
234 	if (wc->wr_id & SDP_OP_RDMA) {
235 		/* TODO: handle failed RDMA read cqe */
236 
237 		sdp_dbg_data(ssk->socket,
238 	 	    "TX comp: RDMA read. status: %d\n", wc->status);
239 		sdp_prf1(sk, NULL, "TX comp: RDMA read");
240 
241 		if (!ssk->tx_ring.rdma_inflight) {
242 			sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
243 			return;
244 		}
245 
246 		if (!ssk->tx_ring.rdma_inflight->busy) {
247 			sdp_warn(ssk->socket,
248 			    "ERROR: too many RDMA read completions\n");
249 			return;
250 		}
251 
252 		/* Only last RDMA read WR is signalled. Order is guaranteed -
253 		 * therefore if Last RDMA read WR is completed - all other
254 		 * have, too */
255 		ssk->tx_ring.rdma_inflight->busy = 0;
256 		sowwakeup(ssk->socket);
257 		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
258 		return;
259 	}
260 #endif
261 
262 	/* Keepalive probe sent cleanup */
263 	sdp_cnt(sdp_keepalive_probes_sent);
264 
265 	if (likely(!wc->status))
266 		return;
267 
268 	sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
269 			__func__, wc->status);
270 
271 	if (wc->status == IB_WC_WR_FLUSH_ERR)
272 		return;
273 
274 	sdp_notify(ssk, ECONNRESET);
275 }
276 
277 static int
278 sdp_process_tx_cq(struct sdp_sock *ssk)
279 {
280 	struct ib_wc ibwc[SDP_NUM_WC];
281 	int n, i;
282 	int wc_processed = 0;
283 
284 	SDP_WLOCK_ASSERT(ssk);
285 
286 	if (!ssk->tx_ring.cq) {
287 		sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
288 		return 0;
289 	}
290 
291 	do {
292 		n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
293 		for (i = 0; i < n; ++i) {
294 			sdp_process_tx_wc(ssk, ibwc + i);
295 			wc_processed++;
296 		}
297 	} while (n == SDP_NUM_WC);
298 
299 	if (wc_processed) {
300 		sdp_post_sends(ssk, M_NOWAIT);
301 		sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
302 				(u32) tx_ring_posted(ssk));
303 		sowwakeup(ssk->socket);
304 	}
305 
306 	return wc_processed;
307 }
308 
309 static void
310 sdp_poll_tx(struct sdp_sock *ssk)
311 {
312 	struct socket *sk = ssk->socket;
313 	u32 inflight, wc_processed;
314 
315 	sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
316 		(u32) tx_ring_posted(ssk),
317 		ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
318 
319 	if (unlikely(ssk->state == TCPS_CLOSED)) {
320 		sdp_warn(sk, "Socket is closed\n");
321 		goto out;
322 	}
323 
324 	wc_processed = sdp_process_tx_cq(ssk);
325 	if (!wc_processed)
326 		SDPSTATS_COUNTER_INC(tx_poll_miss);
327 	else
328 		SDPSTATS_COUNTER_INC(tx_poll_hit);
329 
330 	inflight = (u32) tx_ring_posted(ssk);
331 	sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d",
332 	    inflight);
333 
334 	/* If there are still packets in flight and the timer has not already
335 	 * been scheduled by the Tx routine then schedule it here to guarantee
336 	 * completion processing of these packets */
337 	if (inflight)
338 		callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
339 		    sdp_poll_tx_timeout, ssk);
340 out:
341 #ifdef SDP_ZCOPY
342 	if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
343 		sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
344 		sdp_arm_tx_cq(ssk);
345 	}
346 #endif
347 	return;
348 }
349 
350 static void
351 sdp_poll_tx_timeout(void *data)
352 {
353 	struct sdp_sock *ssk = (struct sdp_sock *)data;
354 
355 	if (!callout_active(&ssk->tx_ring.timer))
356 		return;
357 	callout_deactivate(&ssk->tx_ring.timer);
358 	sdp_poll_tx(ssk);
359 }
360 
361 static void
362 sdp_tx_irq(struct ib_cq *cq, void *cq_context)
363 {
364 	struct sdp_sock *ssk;
365 
366 	ssk = cq_context;
367 	sdp_prf1(ssk->socket, NULL, "tx irq");
368 	sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
369 	SDPSTATS_COUNTER_INC(tx_int_count);
370 	SDP_WLOCK(ssk);
371 	sdp_poll_tx(ssk);
372 	SDP_WUNLOCK(ssk);
373 }
374 
375 static
376 void sdp_tx_ring_purge(struct sdp_sock *ssk)
377 {
378 	while (tx_ring_posted(ssk)) {
379 		struct mbuf *mb;
380 		mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
381 		if (!mb)
382 			break;
383 		m_freem(mb);
384 	}
385 }
386 
387 void
388 sdp_post_keepalive(struct sdp_sock *ssk)
389 {
390 	int rc;
391 	struct ib_send_wr wr, *bad_wr;
392 
393 	sdp_dbg(ssk->socket, "%s\n", __func__);
394 
395 	memset(&wr, 0, sizeof(wr));
396 
397 	wr.next    = NULL;
398 	wr.wr_id   = 0;
399 	wr.sg_list = NULL;
400 	wr.num_sge = 0;
401 	wr.opcode  = IB_WR_RDMA_WRITE;
402 
403 	rc = ib_post_send(ssk->qp, &wr, &bad_wr);
404 	if (rc) {
405 		sdp_dbg(ssk->socket,
406 			"ib_post_keepalive failed with status %d.\n", rc);
407 		sdp_notify(ssk, ECONNRESET);
408 	}
409 
410 	sdp_cnt(sdp_keepalive_probes_sent);
411 }
412 
413 static void
414 sdp_tx_cq_event_handler(struct ib_event *event, void *data)
415 {
416 }
417 
418 int
419 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
420 {
421 	struct ib_cq *tx_cq;
422 	int rc = 0;
423 
424 	sdp_dbg(ssk->socket, "tx ring create\n");
425 	callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
426 	callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
427 	atomic_set(&ssk->tx_ring.head, 1);
428 	atomic_set(&ssk->tx_ring.tail, 1);
429 
430 	ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE,
431 	    M_SDP, M_WAITOK);
432 
433 	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
434 			  ssk, SDP_TX_SIZE, 0);
435 	if (IS_ERR(tx_cq)) {
436 		rc = PTR_ERR(tx_cq);
437 		sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
438 		goto err_cq;
439 	}
440 	ssk->tx_ring.cq = tx_cq;
441 	ssk->tx_ring.poll_cnt = 0;
442 	sdp_arm_tx_cq(ssk);
443 
444 	return 0;
445 
446 err_cq:
447 	free(ssk->tx_ring.buffer, M_SDP);
448 	ssk->tx_ring.buffer = NULL;
449 	return rc;
450 }
451 
452 void
453 sdp_tx_ring_destroy(struct sdp_sock *ssk)
454 {
455 
456 	sdp_dbg(ssk->socket, "tx ring destroy\n");
457 	SDP_WLOCK(ssk);
458 	callout_stop(&ssk->tx_ring.timer);
459 	callout_stop(&ssk->nagle_timer);
460 	SDP_WUNLOCK(ssk);
461 	callout_drain(&ssk->tx_ring.timer);
462 	callout_drain(&ssk->nagle_timer);
463 
464 	if (ssk->tx_ring.buffer) {
465 		sdp_tx_ring_purge(ssk);
466 		free(ssk->tx_ring.buffer, M_SDP);
467 		ssk->tx_ring.buffer = NULL;
468 	}
469 
470 	if (ssk->tx_ring.cq) {
471 		if (ib_destroy_cq(ssk->tx_ring.cq)) {
472 			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
473 					ssk->tx_ring.cq);
474 		} else {
475 			ssk->tx_ring.cq = NULL;
476 		}
477 	}
478 
479 	WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
480 }
481