1 /*-
2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3 *
4 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
18 * disclaimer.
19 *
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 */
34 #include "sdp.h"
35
36 #define sdp_cnt(var) do { (var)++; } while (0)
37
38 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
39 "Total number of keepalive probes sent.");
40
41 static int sdp_process_tx_cq(struct sdp_sock *ssk);
42 static void sdp_poll_tx_timeout(void *data);
43
44 int
sdp_xmit_poll(struct sdp_sock * ssk,int force)45 sdp_xmit_poll(struct sdp_sock *ssk, int force)
46 {
47 int wc_processed = 0;
48
49 SDP_WLOCK_ASSERT(ssk);
50 sdp_prf(ssk->socket, NULL, "%s", __func__);
51
52 /* If we don't have a pending timer, set one up to catch our recent
53 post in case the interface becomes idle */
54 if (!callout_pending(&ssk->tx_ring.timer))
55 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
56 sdp_poll_tx_timeout, ssk);
57
58 /* Poll the CQ every SDP_TX_POLL_MODER packets */
59 if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
60 wc_processed = sdp_process_tx_cq(ssk);
61
62 return wc_processed;
63 }
64
65 void
sdp_post_send(struct sdp_sock * ssk,struct mbuf * mb)66 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
67 {
68 struct sdp_buf *tx_req;
69 struct sdp_bsdh *h;
70 unsigned long mseq;
71 struct ib_device *dev;
72 const struct ib_send_wr *bad_wr;
73 struct ib_sge ibsge[SDP_MAX_SEND_SGES];
74 struct ib_sge *sge;
75 struct ib_send_wr tx_wr = { NULL };
76 int i, rc;
77 u64 addr;
78
79 SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
80 SDPSTATS_HIST(send_size, mb->len);
81
82 if (!ssk->qp_active) {
83 m_freem(mb);
84 return;
85 }
86
87 mseq = ring_head(ssk->tx_ring);
88 h = mtod(mb, struct sdp_bsdh *);
89 ssk->tx_packets++;
90 ssk->tx_bytes += mb->m_pkthdr.len;
91
92 #ifdef SDP_ZCOPY
93 if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
94 struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
95 if (ssk->tx_sa != tx_sa) {
96 sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
97 "before being sent!\n");
98 WARN_ON(1);
99 m_freem(mb);
100 return;
101 }
102 TX_SRCAVAIL_STATE(mb)->mseq = mseq;
103 }
104 #endif
105
106 if (unlikely(mb->m_flags & M_URG))
107 h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
108 else
109 h->flags = 0;
110
111 mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
112 h->bufs = htons(rx_ring_posted(ssk));
113 h->len = htonl(mb->m_pkthdr.len);
114 h->mseq = htonl(mseq);
115 h->mseq_ack = htonl(mseq_ack(ssk));
116
117 sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
118 mid2str(h->mid), rx_ring_posted(ssk), mseq,
119 ntohl(h->mseq_ack));
120
121 SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
122
123 tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
124 tx_req->mb = mb;
125 dev = ssk->ib_device;
126 sge = &ibsge[0];
127 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
128 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129 DMA_TO_DEVICE);
130 /* TODO: proper error handling */
131 BUG_ON(ib_dma_mapping_error(dev, addr));
132 BUG_ON(i >= SDP_MAX_SEND_SGES);
133 tx_req->mapping[i] = addr;
134 sge->addr = addr;
135 sge->length = mb->m_len;
136 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
137 }
138 tx_wr.next = NULL;
139 tx_wr.wr_id = mseq | SDP_OP_SEND;
140 tx_wr.sg_list = ibsge;
141 tx_wr.num_sge = i;
142 tx_wr.opcode = IB_WR_SEND;
143 tx_wr.send_flags = IB_SEND_SIGNALED;
144 if (unlikely(tx_req->mb->m_flags & M_URG))
145 tx_wr.send_flags |= IB_SEND_SOLICITED;
146
147 rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
148 if (unlikely(rc)) {
149 sdp_dbg(ssk->socket,
150 "ib_post_send failed with status %d.\n", rc);
151
152 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
153
154 sdp_notify(ssk, ECONNRESET);
155 m_freem(tx_req->mb);
156 return;
157 }
158
159 atomic_inc(&ssk->tx_ring.head);
160 atomic_dec(&ssk->tx_ring.credits);
161 atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
162
163 return;
164 }
165
166 static struct mbuf *
sdp_send_completion(struct sdp_sock * ssk,int mseq)167 sdp_send_completion(struct sdp_sock *ssk, int mseq)
168 {
169 struct ib_device *dev;
170 struct sdp_buf *tx_req;
171 struct mbuf *mb = NULL;
172 struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
173
174 if (unlikely(mseq != ring_tail(*tx_ring))) {
175 printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
176 mseq, ring_tail(*tx_ring));
177 goto out;
178 }
179
180 dev = ssk->ib_device;
181 tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
182 mb = tx_req->mb;
183 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
184
185 #ifdef SDP_ZCOPY
186 /* TODO: AIO and real zcopy code; add their context support here */
187 if (BZCOPY_STATE(mb))
188 BZCOPY_STATE(mb)->busy--;
189 #endif
190
191 atomic_inc(&tx_ring->tail);
192
193 out:
194 return mb;
195 }
196
197 static int
sdp_handle_send_comp(struct sdp_sock * ssk,struct ib_wc * wc)198 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
199 {
200 struct mbuf *mb = NULL;
201 struct sdp_bsdh *h;
202
203 if (unlikely(wc->status)) {
204 if (wc->status != IB_WC_WR_FLUSH_ERR) {
205 sdp_prf(ssk->socket, mb, "Send completion with error. "
206 "Status %d", wc->status);
207 sdp_dbg_data(ssk->socket, "Send completion with error. "
208 "Status %d\n", wc->status);
209 sdp_notify(ssk, ECONNRESET);
210 }
211 }
212
213 mb = sdp_send_completion(ssk, wc->wr_id);
214 if (unlikely(!mb))
215 return -1;
216
217 h = mtod(mb, struct sdp_bsdh *);
218 sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
219 sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
220 mb, mb->m_pkthdr.len, ntohl(h->mseq));
221 m_freem(mb);
222
223 return 0;
224 }
225
226 static inline void
sdp_process_tx_wc(struct sdp_sock * ssk,struct ib_wc * wc)227 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
228 {
229
230 if (likely(wc->wr_id & SDP_OP_SEND)) {
231 sdp_handle_send_comp(ssk, wc);
232 return;
233 }
234
235 #ifdef SDP_ZCOPY
236 if (wc->wr_id & SDP_OP_RDMA) {
237 /* TODO: handle failed RDMA read cqe */
238
239 sdp_dbg_data(ssk->socket,
240 "TX comp: RDMA read. status: %d\n", wc->status);
241 sdp_prf1(sk, NULL, "TX comp: RDMA read");
242
243 if (!ssk->tx_ring.rdma_inflight) {
244 sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
245 return;
246 }
247
248 if (!ssk->tx_ring.rdma_inflight->busy) {
249 sdp_warn(ssk->socket,
250 "ERROR: too many RDMA read completions\n");
251 return;
252 }
253
254 /* Only last RDMA read WR is signalled. Order is guaranteed -
255 * therefore if Last RDMA read WR is completed - all other
256 * have, too */
257 ssk->tx_ring.rdma_inflight->busy = 0;
258 sowwakeup(ssk->socket);
259 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
260 return;
261 }
262 #endif
263
264 /* Keepalive probe sent cleanup */
265 sdp_cnt(sdp_keepalive_probes_sent);
266
267 if (likely(!wc->status))
268 return;
269
270 sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
271 __func__, wc->status);
272
273 if (wc->status == IB_WC_WR_FLUSH_ERR)
274 return;
275
276 sdp_notify(ssk, ECONNRESET);
277 }
278
279 static int
sdp_process_tx_cq(struct sdp_sock * ssk)280 sdp_process_tx_cq(struct sdp_sock *ssk)
281 {
282 struct ib_wc ibwc[SDP_NUM_WC];
283 int n, i;
284 int wc_processed = 0;
285
286 SDP_WLOCK_ASSERT(ssk);
287
288 if (!ssk->tx_ring.cq) {
289 sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
290 return 0;
291 }
292
293 do {
294 n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
295 for (i = 0; i < n; ++i) {
296 sdp_process_tx_wc(ssk, ibwc + i);
297 wc_processed++;
298 }
299 } while (n == SDP_NUM_WC);
300
301 if (wc_processed) {
302 sdp_post_sends(ssk, M_NOWAIT);
303 sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
304 (u32) tx_ring_posted(ssk));
305 sowwakeup(ssk->socket);
306 }
307
308 return wc_processed;
309 }
310
311 static void
sdp_poll_tx(struct sdp_sock * ssk)312 sdp_poll_tx(struct sdp_sock *ssk)
313 {
314 struct socket *sk = ssk->socket;
315 u32 inflight, wc_processed;
316
317 sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
318 (u32) tx_ring_posted(ssk),
319 ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
320
321 if (unlikely(ssk->state == TCPS_CLOSED)) {
322 sdp_warn(sk, "Socket is closed\n");
323 goto out;
324 }
325
326 wc_processed = sdp_process_tx_cq(ssk);
327 if (!wc_processed)
328 SDPSTATS_COUNTER_INC(tx_poll_miss);
329 else
330 SDPSTATS_COUNTER_INC(tx_poll_hit);
331
332 inflight = (u32) tx_ring_posted(ssk);
333 sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d",
334 inflight);
335
336 /* If there are still packets in flight and the timer has not already
337 * been scheduled by the Tx routine then schedule it here to guarantee
338 * completion processing of these packets */
339 if (inflight)
340 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
341 sdp_poll_tx_timeout, ssk);
342 out:
343 #ifdef SDP_ZCOPY
344 if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
345 sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
346 sdp_arm_tx_cq(ssk);
347 }
348 #endif
349 return;
350 }
351
352 static void
sdp_poll_tx_timeout(void * data)353 sdp_poll_tx_timeout(void *data)
354 {
355 struct sdp_sock *ssk = (struct sdp_sock *)data;
356
357 if (!callout_active(&ssk->tx_ring.timer))
358 return;
359 callout_deactivate(&ssk->tx_ring.timer);
360 sdp_poll_tx(ssk);
361 }
362
363 static void
sdp_tx_irq(struct ib_cq * cq,void * cq_context)364 sdp_tx_irq(struct ib_cq *cq, void *cq_context)
365 {
366 struct sdp_sock *ssk;
367
368 ssk = cq_context;
369 sdp_prf1(ssk->socket, NULL, "tx irq");
370 sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
371 SDPSTATS_COUNTER_INC(tx_int_count);
372 SDP_WLOCK(ssk);
373 sdp_poll_tx(ssk);
374 SDP_WUNLOCK(ssk);
375 }
376
377 static
sdp_tx_ring_purge(struct sdp_sock * ssk)378 void sdp_tx_ring_purge(struct sdp_sock *ssk)
379 {
380 while (tx_ring_posted(ssk)) {
381 struct mbuf *mb;
382 mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
383 if (!mb)
384 break;
385 m_freem(mb);
386 }
387 }
388
389 void
sdp_post_keepalive(struct sdp_sock * ssk)390 sdp_post_keepalive(struct sdp_sock *ssk)
391 {
392 int rc;
393 struct ib_send_wr wr;
394 const struct ib_send_wr *bad_wr;
395
396 sdp_dbg(ssk->socket, "%s\n", __func__);
397
398 memset(&wr, 0, sizeof(wr));
399
400 wr.next = NULL;
401 wr.wr_id = 0;
402 wr.sg_list = NULL;
403 wr.num_sge = 0;
404 wr.opcode = IB_WR_RDMA_WRITE;
405
406 rc = ib_post_send(ssk->qp, &wr, &bad_wr);
407 if (rc) {
408 sdp_dbg(ssk->socket,
409 "ib_post_keepalive failed with status %d.\n", rc);
410 sdp_notify(ssk, ECONNRESET);
411 }
412
413 sdp_cnt(sdp_keepalive_probes_sent);
414 }
415
416 static void
sdp_tx_cq_event_handler(struct ib_event * event,void * data)417 sdp_tx_cq_event_handler(struct ib_event *event, void *data)
418 {
419 }
420
421 int
sdp_tx_ring_create(struct sdp_sock * ssk,struct ib_device * device)422 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
423 {
424 struct ib_cq_init_attr tx_cq_attr = {
425 .cqe = SDP_TX_SIZE,
426 .comp_vector = 0,
427 .flags = 0,
428 };
429 struct ib_cq *tx_cq;
430 int rc = 0;
431
432 sdp_dbg(ssk->socket, "tx ring create\n");
433 callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
434 callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
435 atomic_set(&ssk->tx_ring.head, 1);
436 atomic_set(&ssk->tx_ring.tail, 1);
437
438 ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE,
439 M_SDP, M_WAITOK);
440
441 tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
442 ssk, &tx_cq_attr);
443 if (IS_ERR(tx_cq)) {
444 rc = PTR_ERR(tx_cq);
445 sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
446 goto err_cq;
447 }
448 ssk->tx_ring.cq = tx_cq;
449 ssk->tx_ring.poll_cnt = 0;
450 sdp_arm_tx_cq(ssk);
451
452 return 0;
453
454 err_cq:
455 free(ssk->tx_ring.buffer, M_SDP);
456 ssk->tx_ring.buffer = NULL;
457 return rc;
458 }
459
460 void
sdp_tx_ring_destroy(struct sdp_sock * ssk)461 sdp_tx_ring_destroy(struct sdp_sock *ssk)
462 {
463
464 sdp_dbg(ssk->socket, "tx ring destroy\n");
465 SDP_WLOCK(ssk);
466 callout_stop(&ssk->tx_ring.timer);
467 callout_stop(&ssk->nagle_timer);
468 SDP_WUNLOCK(ssk);
469 callout_drain(&ssk->tx_ring.timer);
470 callout_drain(&ssk->nagle_timer);
471
472 if (ssk->tx_ring.buffer) {
473 sdp_tx_ring_purge(ssk);
474 free(ssk->tx_ring.buffer, M_SDP);
475 ssk->tx_ring.buffer = NULL;
476 }
477
478 if (ssk->tx_ring.cq) {
479 ib_destroy_cq(ssk->tx_ring.cq);
480 ssk->tx_ring.cq = NULL;
481 }
482
483 WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
484 }
485