xref: /freebsd/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c (revision bdafb02fcb88389fd1ab684cfe734cb429d35618)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3  *
4  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * OpenIB.org BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  */
34 #include "sdp.h"
35 
36 #define sdp_cnt(var) do { (var)++; } while (0)
37 
38 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
39 		"Total number of keepalive probes sent.");
40 
41 static int sdp_process_tx_cq(struct sdp_sock *ssk);
42 static void sdp_poll_tx_timeout(void *data);
43 
44 int
45 sdp_xmit_poll(struct sdp_sock *ssk, int force)
46 {
47 	int wc_processed = 0;
48 
49 	SDP_WLOCK_ASSERT(ssk);
50 	sdp_prf(ssk->socket, NULL, "%s", __func__);
51 
52 	/* If we don't have a pending timer, set one up to catch our recent
53 	   post in case the interface becomes idle */
54 	if (!callout_pending(&ssk->tx_ring.timer))
55 		callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
56 		    sdp_poll_tx_timeout, ssk);
57 
58 	/* Poll the CQ every SDP_TX_POLL_MODER packets */
59 	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
60 		wc_processed = sdp_process_tx_cq(ssk);
61 
62 	return wc_processed;
63 }
64 
65 void
66 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
67 {
68 	struct sdp_buf *tx_req;
69 	struct sdp_bsdh *h;
70 	unsigned long mseq;
71 	struct ib_device *dev;
72 	struct ib_send_wr *bad_wr;
73 	struct ib_sge ibsge[SDP_MAX_SEND_SGES];
74 	struct ib_sge *sge;
75 	struct ib_send_wr tx_wr = { NULL };
76 	int i, rc;
77 	u64 addr;
78 
79 	SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
80 	SDPSTATS_HIST(send_size, mb->len);
81 
82 	if (!ssk->qp_active) {
83 		m_freem(mb);
84 		return;
85 	}
86 
87 	mseq = ring_head(ssk->tx_ring);
88 	h = mtod(mb, struct sdp_bsdh *);
89 	ssk->tx_packets++;
90 	ssk->tx_bytes += mb->m_pkthdr.len;
91 
92 #ifdef SDP_ZCOPY
93 	if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
94 		struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
95 		if (ssk->tx_sa != tx_sa) {
96 			sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
97 					"before being sent!\n");
98 			WARN_ON(1);
99 			m_freem(mb);
100 			return;
101 		}
102 		TX_SRCAVAIL_STATE(mb)->mseq = mseq;
103 	}
104 #endif
105 
106 	if (unlikely(mb->m_flags & M_URG))
107 		h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
108 	else
109 		h->flags = 0;
110 
111 	mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
112 	h->bufs = htons(rx_ring_posted(ssk));
113 	h->len = htonl(mb->m_pkthdr.len);
114 	h->mseq = htonl(mseq);
115 	h->mseq_ack = htonl(mseq_ack(ssk));
116 
117 	sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
118 			mid2str(h->mid), rx_ring_posted(ssk), mseq,
119 			ntohl(h->mseq_ack));
120 
121 	SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
122 
123 	tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
124 	tx_req->mb = mb;
125 	dev = ssk->ib_device;
126 	sge = &ibsge[0];
127 	for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
128 		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129 		    DMA_TO_DEVICE);
130 		/* TODO: proper error handling */
131 		BUG_ON(ib_dma_mapping_error(dev, addr));
132 		BUG_ON(i >= SDP_MAX_SEND_SGES);
133 		tx_req->mapping[i] = addr;
134 		sge->addr = addr;
135 		sge->length = mb->m_len;
136 		sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
137 	}
138 	tx_wr.next = NULL;
139 	tx_wr.wr_id = mseq | SDP_OP_SEND;
140 	tx_wr.sg_list = ibsge;
141 	tx_wr.num_sge = i;
142 	tx_wr.opcode = IB_WR_SEND;
143 	tx_wr.send_flags = IB_SEND_SIGNALED;
144 	if (unlikely(tx_req->mb->m_flags & M_URG))
145 		tx_wr.send_flags |= IB_SEND_SOLICITED;
146 
147 	rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
148 	if (unlikely(rc)) {
149 		sdp_dbg(ssk->socket,
150 				"ib_post_send failed with status %d.\n", rc);
151 
152 		sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
153 
154 		sdp_notify(ssk, ECONNRESET);
155 		m_freem(tx_req->mb);
156 		return;
157 	}
158 
159 	atomic_inc(&ssk->tx_ring.head);
160 	atomic_dec(&ssk->tx_ring.credits);
161 	atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
162 
163 	return;
164 }
165 
166 static struct mbuf *
167 sdp_send_completion(struct sdp_sock *ssk, int mseq)
168 {
169 	struct ib_device *dev;
170 	struct sdp_buf *tx_req;
171 	struct mbuf *mb = NULL;
172 	struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
173 
174 	if (unlikely(mseq != ring_tail(*tx_ring))) {
175 		printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
176 			mseq, ring_tail(*tx_ring));
177 		goto out;
178 	}
179 
180 	dev = ssk->ib_device;
181 	tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
182 	mb = tx_req->mb;
183 	sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
184 
185 #ifdef SDP_ZCOPY
186 	/* TODO: AIO and real zcopy code; add their context support here */
187 	if (BZCOPY_STATE(mb))
188 		BZCOPY_STATE(mb)->busy--;
189 #endif
190 
191 	atomic_inc(&tx_ring->tail);
192 
193 out:
194 	return mb;
195 }
196 
197 static int
198 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
199 {
200 	struct mbuf *mb = NULL;
201 	struct sdp_bsdh *h;
202 
203 	if (unlikely(wc->status)) {
204 		if (wc->status != IB_WC_WR_FLUSH_ERR) {
205 			sdp_prf(ssk->socket, mb, "Send completion with error. "
206 				"Status %d", wc->status);
207 			sdp_dbg_data(ssk->socket, "Send completion with error. "
208 				"Status %d\n", wc->status);
209 			sdp_notify(ssk, ECONNRESET);
210 		}
211 	}
212 
213 	mb = sdp_send_completion(ssk, wc->wr_id);
214 	if (unlikely(!mb))
215 		return -1;
216 
217 	h = mtod(mb, struct sdp_bsdh *);
218 	sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
219 	sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
220 	    mb, mb->m_pkthdr.len, ntohl(h->mseq));
221 	m_freem(mb);
222 
223 	return 0;
224 }
225 
226 static inline void
227 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
228 {
229 
230 	if (likely(wc->wr_id & SDP_OP_SEND)) {
231 		sdp_handle_send_comp(ssk, wc);
232 		return;
233 	}
234 
235 #ifdef SDP_ZCOPY
236 	if (wc->wr_id & SDP_OP_RDMA) {
237 		/* TODO: handle failed RDMA read cqe */
238 
239 		sdp_dbg_data(ssk->socket,
240 	 	    "TX comp: RDMA read. status: %d\n", wc->status);
241 		sdp_prf1(sk, NULL, "TX comp: RDMA read");
242 
243 		if (!ssk->tx_ring.rdma_inflight) {
244 			sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
245 			return;
246 		}
247 
248 		if (!ssk->tx_ring.rdma_inflight->busy) {
249 			sdp_warn(ssk->socket,
250 			    "ERROR: too many RDMA read completions\n");
251 			return;
252 		}
253 
254 		/* Only last RDMA read WR is signalled. Order is guaranteed -
255 		 * therefore if Last RDMA read WR is completed - all other
256 		 * have, too */
257 		ssk->tx_ring.rdma_inflight->busy = 0;
258 		sowwakeup(ssk->socket);
259 		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
260 		return;
261 	}
262 #endif
263 
264 	/* Keepalive probe sent cleanup */
265 	sdp_cnt(sdp_keepalive_probes_sent);
266 
267 	if (likely(!wc->status))
268 		return;
269 
270 	sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
271 			__func__, wc->status);
272 
273 	if (wc->status == IB_WC_WR_FLUSH_ERR)
274 		return;
275 
276 	sdp_notify(ssk, ECONNRESET);
277 }
278 
279 static int
280 sdp_process_tx_cq(struct sdp_sock *ssk)
281 {
282 	struct ib_wc ibwc[SDP_NUM_WC];
283 	int n, i;
284 	int wc_processed = 0;
285 
286 	SDP_WLOCK_ASSERT(ssk);
287 
288 	if (!ssk->tx_ring.cq) {
289 		sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
290 		return 0;
291 	}
292 
293 	do {
294 		n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
295 		for (i = 0; i < n; ++i) {
296 			sdp_process_tx_wc(ssk, ibwc + i);
297 			wc_processed++;
298 		}
299 	} while (n == SDP_NUM_WC);
300 
301 	if (wc_processed) {
302 		sdp_post_sends(ssk, M_NOWAIT);
303 		sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
304 				(u32) tx_ring_posted(ssk));
305 		sowwakeup(ssk->socket);
306 	}
307 
308 	return wc_processed;
309 }
310 
311 static void
312 sdp_poll_tx(struct sdp_sock *ssk)
313 {
314 	struct socket *sk = ssk->socket;
315 	u32 inflight, wc_processed;
316 
317 	sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
318 		(u32) tx_ring_posted(ssk),
319 		ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
320 
321 	if (unlikely(ssk->state == TCPS_CLOSED)) {
322 		sdp_warn(sk, "Socket is closed\n");
323 		goto out;
324 	}
325 
326 	wc_processed = sdp_process_tx_cq(ssk);
327 	if (!wc_processed)
328 		SDPSTATS_COUNTER_INC(tx_poll_miss);
329 	else
330 		SDPSTATS_COUNTER_INC(tx_poll_hit);
331 
332 	inflight = (u32) tx_ring_posted(ssk);
333 	sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d",
334 	    inflight);
335 
336 	/* If there are still packets in flight and the timer has not already
337 	 * been scheduled by the Tx routine then schedule it here to guarantee
338 	 * completion processing of these packets */
339 	if (inflight)
340 		callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
341 		    sdp_poll_tx_timeout, ssk);
342 out:
343 #ifdef SDP_ZCOPY
344 	if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
345 		sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
346 		sdp_arm_tx_cq(ssk);
347 	}
348 #endif
349 	return;
350 }
351 
352 static void
353 sdp_poll_tx_timeout(void *data)
354 {
355 	struct sdp_sock *ssk = (struct sdp_sock *)data;
356 
357 	if (!callout_active(&ssk->tx_ring.timer))
358 		return;
359 	callout_deactivate(&ssk->tx_ring.timer);
360 	sdp_poll_tx(ssk);
361 }
362 
363 static void
364 sdp_tx_irq(struct ib_cq *cq, void *cq_context)
365 {
366 	struct sdp_sock *ssk;
367 
368 	ssk = cq_context;
369 	sdp_prf1(ssk->socket, NULL, "tx irq");
370 	sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
371 	SDPSTATS_COUNTER_INC(tx_int_count);
372 	SDP_WLOCK(ssk);
373 	sdp_poll_tx(ssk);
374 	SDP_WUNLOCK(ssk);
375 }
376 
377 static
378 void sdp_tx_ring_purge(struct sdp_sock *ssk)
379 {
380 	while (tx_ring_posted(ssk)) {
381 		struct mbuf *mb;
382 		mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
383 		if (!mb)
384 			break;
385 		m_freem(mb);
386 	}
387 }
388 
389 void
390 sdp_post_keepalive(struct sdp_sock *ssk)
391 {
392 	int rc;
393 	struct ib_send_wr wr, *bad_wr;
394 
395 	sdp_dbg(ssk->socket, "%s\n", __func__);
396 
397 	memset(&wr, 0, sizeof(wr));
398 
399 	wr.next    = NULL;
400 	wr.wr_id   = 0;
401 	wr.sg_list = NULL;
402 	wr.num_sge = 0;
403 	wr.opcode  = IB_WR_RDMA_WRITE;
404 
405 	rc = ib_post_send(ssk->qp, &wr, &bad_wr);
406 	if (rc) {
407 		sdp_dbg(ssk->socket,
408 			"ib_post_keepalive failed with status %d.\n", rc);
409 		sdp_notify(ssk, ECONNRESET);
410 	}
411 
412 	sdp_cnt(sdp_keepalive_probes_sent);
413 }
414 
415 static void
416 sdp_tx_cq_event_handler(struct ib_event *event, void *data)
417 {
418 }
419 
420 int
421 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
422 {
423 	struct ib_cq_init_attr tx_cq_attr = {
424 		.cqe = SDP_TX_SIZE,
425 		.comp_vector = 0,
426 		.flags = 0,
427 	};
428 	struct ib_cq *tx_cq;
429 	int rc = 0;
430 
431 	sdp_dbg(ssk->socket, "tx ring create\n");
432 	callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
433 	callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
434 	atomic_set(&ssk->tx_ring.head, 1);
435 	atomic_set(&ssk->tx_ring.tail, 1);
436 
437 	ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE,
438 	    M_SDP, M_WAITOK);
439 
440 	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
441 			  ssk, &tx_cq_attr);
442 	if (IS_ERR(tx_cq)) {
443 		rc = PTR_ERR(tx_cq);
444 		sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
445 		goto err_cq;
446 	}
447 	ssk->tx_ring.cq = tx_cq;
448 	ssk->tx_ring.poll_cnt = 0;
449 	sdp_arm_tx_cq(ssk);
450 
451 	return 0;
452 
453 err_cq:
454 	free(ssk->tx_ring.buffer, M_SDP);
455 	ssk->tx_ring.buffer = NULL;
456 	return rc;
457 }
458 
459 void
460 sdp_tx_ring_destroy(struct sdp_sock *ssk)
461 {
462 
463 	sdp_dbg(ssk->socket, "tx ring destroy\n");
464 	SDP_WLOCK(ssk);
465 	callout_stop(&ssk->tx_ring.timer);
466 	callout_stop(&ssk->nagle_timer);
467 	SDP_WUNLOCK(ssk);
468 	callout_drain(&ssk->tx_ring.timer);
469 	callout_drain(&ssk->nagle_timer);
470 
471 	if (ssk->tx_ring.buffer) {
472 		sdp_tx_ring_purge(ssk);
473 		free(ssk->tx_ring.buffer, M_SDP);
474 		ssk->tx_ring.buffer = NULL;
475 	}
476 
477 	if (ssk->tx_ring.cq) {
478 		if (ib_destroy_cq(ssk->tx_ring.cq)) {
479 			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
480 					ssk->tx_ring.cq);
481 		} else {
482 			ssk->tx_ring.cq = NULL;
483 		}
484 	}
485 
486 	WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
487 }
488