1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0 3 * 4 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the 10 * OpenIB.org BSD license below: 11 * 12 * Redistribution and use in source and binary forms, with or 13 * without modification, are permitted provided that the following 14 * conditions are met: 15 * 16 * - Redistributions of source code must retain the above 17 * copyright notice, this list of conditions and the following 18 * disclaimer. 19 * 20 * - Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials 23 * provided with the distribution. 24 * 25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32 * SOFTWARE. 33 */ 34 #include "sdp.h" 35 36 #define sdp_cnt(var) do { (var)++; } while (0) 37 38 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0, 39 "Total number of keepalive probes sent."); 40 41 static int sdp_process_tx_cq(struct sdp_sock *ssk); 42 static void sdp_poll_tx_timeout(void *data); 43 44 int 45 sdp_xmit_poll(struct sdp_sock *ssk, int force) 46 { 47 int wc_processed = 0; 48 49 SDP_WLOCK_ASSERT(ssk); 50 sdp_prf(ssk->socket, NULL, "%s", __func__); 51 52 /* If we don't have a pending timer, set one up to catch our recent 53 post in case the interface becomes idle */ 54 if (!callout_pending(&ssk->tx_ring.timer)) 55 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT, 56 sdp_poll_tx_timeout, ssk); 57 58 /* Poll the CQ every SDP_TX_POLL_MODER packets */ 59 if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) 60 wc_processed = sdp_process_tx_cq(ssk); 61 62 return wc_processed; 63 } 64 65 void 66 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb) 67 { 68 struct sdp_buf *tx_req; 69 struct sdp_bsdh *h; 70 unsigned long mseq; 71 struct ib_device *dev; 72 struct ib_send_wr *bad_wr; 73 struct ib_sge ibsge[SDP_MAX_SEND_SGES]; 74 struct ib_sge *sge; 75 struct ib_send_wr tx_wr = { NULL }; 76 int i, rc; 77 u64 addr; 78 79 SDPSTATS_COUNTER_MID_INC(post_send, h->mid); 80 SDPSTATS_HIST(send_size, mb->len); 81 82 if (!ssk->qp_active) { 83 m_freem(mb); 84 return; 85 } 86 87 mseq = ring_head(ssk->tx_ring); 88 h = mtod(mb, struct sdp_bsdh *); 89 ssk->tx_packets++; 90 ssk->tx_bytes += mb->m_pkthdr.len; 91 92 #ifdef SDP_ZCOPY 93 if (unlikely(h->mid == SDP_MID_SRCAVAIL)) { 94 struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb); 95 if (ssk->tx_sa != tx_sa) { 96 sdp_dbg_data(ssk->socket, "SrcAvail cancelled " 97 "before being sent!\n"); 98 WARN_ON(1); 99 m_freem(mb); 100 return; 101 } 102 TX_SRCAVAIL_STATE(mb)->mseq = mseq; 103 } 104 #endif 105 106 if (unlikely(mb->m_flags & M_URG)) 107 h->flags = SDP_OOB_PRES | SDP_OOB_PEND; 108 else 109 h->flags = 0; 110 111 mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */ 112 h->bufs = htons(rx_ring_posted(ssk)); 113 h->len = htonl(mb->m_pkthdr.len); 114 h->mseq = htonl(mseq); 115 h->mseq_ack = htonl(mseq_ack(ssk)); 116 117 sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d", 118 mid2str(h->mid), rx_ring_posted(ssk), mseq, 119 ntohl(h->mseq_ack)); 120 121 SDP_DUMP_PACKET(ssk->socket, "TX", mb, h); 122 123 tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)]; 124 tx_req->mb = mb; 125 dev = ssk->ib_device; 126 sge = &ibsge[0]; 127 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) { 128 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len, 129 DMA_TO_DEVICE); 130 /* TODO: proper error handling */ 131 BUG_ON(ib_dma_mapping_error(dev, addr)); 132 BUG_ON(i >= SDP_MAX_SEND_SGES); 133 tx_req->mapping[i] = addr; 134 sge->addr = addr; 135 sge->length = mb->m_len; 136 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey; 137 } 138 tx_wr.next = NULL; 139 tx_wr.wr_id = mseq | SDP_OP_SEND; 140 tx_wr.sg_list = ibsge; 141 tx_wr.num_sge = i; 142 tx_wr.opcode = IB_WR_SEND; 143 tx_wr.send_flags = IB_SEND_SIGNALED; 144 if (unlikely(tx_req->mb->m_flags & M_URG)) 145 tx_wr.send_flags |= IB_SEND_SOLICITED; 146 147 rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr); 148 if (unlikely(rc)) { 149 sdp_dbg(ssk->socket, 150 "ib_post_send failed with status %d.\n", rc); 151 152 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE); 153 154 sdp_notify(ssk, ECONNRESET); 155 m_freem(tx_req->mb); 156 return; 157 } 158 159 atomic_inc(&ssk->tx_ring.head); 160 atomic_dec(&ssk->tx_ring.credits); 161 atomic_set(&ssk->remote_credits, rx_ring_posted(ssk)); 162 163 return; 164 } 165 166 static struct mbuf * 167 sdp_send_completion(struct sdp_sock *ssk, int mseq) 168 { 169 struct ib_device *dev; 170 struct sdp_buf *tx_req; 171 struct mbuf *mb = NULL; 172 struct sdp_tx_ring *tx_ring = &ssk->tx_ring; 173 174 if (unlikely(mseq != ring_tail(*tx_ring))) { 175 printk(KERN_WARNING "Bogus send completion id %d tail %d\n", 176 mseq, ring_tail(*tx_ring)); 177 goto out; 178 } 179 180 dev = ssk->ib_device; 181 tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)]; 182 mb = tx_req->mb; 183 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE); 184 185 #ifdef SDP_ZCOPY 186 /* TODO: AIO and real zcopy code; add their context support here */ 187 if (BZCOPY_STATE(mb)) 188 BZCOPY_STATE(mb)->busy--; 189 #endif 190 191 atomic_inc(&tx_ring->tail); 192 193 out: 194 return mb; 195 } 196 197 static int 198 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) 199 { 200 struct mbuf *mb = NULL; 201 struct sdp_bsdh *h; 202 203 if (unlikely(wc->status)) { 204 if (wc->status != IB_WC_WR_FLUSH_ERR) { 205 sdp_prf(ssk->socket, mb, "Send completion with error. " 206 "Status %d", wc->status); 207 sdp_dbg_data(ssk->socket, "Send completion with error. " 208 "Status %d\n", wc->status); 209 sdp_notify(ssk, ECONNRESET); 210 } 211 } 212 213 mb = sdp_send_completion(ssk, wc->wr_id); 214 if (unlikely(!mb)) 215 return -1; 216 217 h = mtod(mb, struct sdp_bsdh *); 218 sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq)); 219 sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d", 220 mb, mb->m_pkthdr.len, ntohl(h->mseq)); 221 m_freem(mb); 222 223 return 0; 224 } 225 226 static inline void 227 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) 228 { 229 230 if (likely(wc->wr_id & SDP_OP_SEND)) { 231 sdp_handle_send_comp(ssk, wc); 232 return; 233 } 234 235 #ifdef SDP_ZCOPY 236 if (wc->wr_id & SDP_OP_RDMA) { 237 /* TODO: handle failed RDMA read cqe */ 238 239 sdp_dbg_data(ssk->socket, 240 "TX comp: RDMA read. status: %d\n", wc->status); 241 sdp_prf1(sk, NULL, "TX comp: RDMA read"); 242 243 if (!ssk->tx_ring.rdma_inflight) { 244 sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n"); 245 return; 246 } 247 248 if (!ssk->tx_ring.rdma_inflight->busy) { 249 sdp_warn(ssk->socket, 250 "ERROR: too many RDMA read completions\n"); 251 return; 252 } 253 254 /* Only last RDMA read WR is signalled. Order is guaranteed - 255 * therefore if Last RDMA read WR is completed - all other 256 * have, too */ 257 ssk->tx_ring.rdma_inflight->busy = 0; 258 sowwakeup(ssk->socket); 259 sdp_dbg_data(ssk->socket, "woke up sleepers\n"); 260 return; 261 } 262 #endif 263 264 /* Keepalive probe sent cleanup */ 265 sdp_cnt(sdp_keepalive_probes_sent); 266 267 if (likely(!wc->status)) 268 return; 269 270 sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n", 271 __func__, wc->status); 272 273 if (wc->status == IB_WC_WR_FLUSH_ERR) 274 return; 275 276 sdp_notify(ssk, ECONNRESET); 277 } 278 279 static int 280 sdp_process_tx_cq(struct sdp_sock *ssk) 281 { 282 struct ib_wc ibwc[SDP_NUM_WC]; 283 int n, i; 284 int wc_processed = 0; 285 286 SDP_WLOCK_ASSERT(ssk); 287 288 if (!ssk->tx_ring.cq) { 289 sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n"); 290 return 0; 291 } 292 293 do { 294 n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc); 295 for (i = 0; i < n; ++i) { 296 sdp_process_tx_wc(ssk, ibwc + i); 297 wc_processed++; 298 } 299 } while (n == SDP_NUM_WC); 300 301 if (wc_processed) { 302 sdp_post_sends(ssk, M_NOWAIT); 303 sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d", 304 (u32) tx_ring_posted(ssk)); 305 sowwakeup(ssk->socket); 306 } 307 308 return wc_processed; 309 } 310 311 static void 312 sdp_poll_tx(struct sdp_sock *ssk) 313 { 314 struct socket *sk = ssk->socket; 315 u32 inflight, wc_processed; 316 317 sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d", 318 (u32) tx_ring_posted(ssk), 319 ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring)); 320 321 if (unlikely(ssk->state == TCPS_CLOSED)) { 322 sdp_warn(sk, "Socket is closed\n"); 323 goto out; 324 } 325 326 wc_processed = sdp_process_tx_cq(ssk); 327 if (!wc_processed) 328 SDPSTATS_COUNTER_INC(tx_poll_miss); 329 else 330 SDPSTATS_COUNTER_INC(tx_poll_hit); 331 332 inflight = (u32) tx_ring_posted(ssk); 333 sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d", 334 inflight); 335 336 /* If there are still packets in flight and the timer has not already 337 * been scheduled by the Tx routine then schedule it here to guarantee 338 * completion processing of these packets */ 339 if (inflight) 340 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT, 341 sdp_poll_tx_timeout, ssk); 342 out: 343 #ifdef SDP_ZCOPY 344 if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) { 345 sdp_prf1(sk, NULL, "RDMA is inflight - arming irq"); 346 sdp_arm_tx_cq(ssk); 347 } 348 #endif 349 return; 350 } 351 352 static void 353 sdp_poll_tx_timeout(void *data) 354 { 355 struct sdp_sock *ssk = (struct sdp_sock *)data; 356 357 if (!callout_active(&ssk->tx_ring.timer)) 358 return; 359 callout_deactivate(&ssk->tx_ring.timer); 360 sdp_poll_tx(ssk); 361 } 362 363 static void 364 sdp_tx_irq(struct ib_cq *cq, void *cq_context) 365 { 366 struct sdp_sock *ssk; 367 368 ssk = cq_context; 369 sdp_prf1(ssk->socket, NULL, "tx irq"); 370 sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n"); 371 SDPSTATS_COUNTER_INC(tx_int_count); 372 SDP_WLOCK(ssk); 373 sdp_poll_tx(ssk); 374 SDP_WUNLOCK(ssk); 375 } 376 377 static 378 void sdp_tx_ring_purge(struct sdp_sock *ssk) 379 { 380 while (tx_ring_posted(ssk)) { 381 struct mbuf *mb; 382 mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring)); 383 if (!mb) 384 break; 385 m_freem(mb); 386 } 387 } 388 389 void 390 sdp_post_keepalive(struct sdp_sock *ssk) 391 { 392 int rc; 393 struct ib_send_wr wr, *bad_wr; 394 395 sdp_dbg(ssk->socket, "%s\n", __func__); 396 397 memset(&wr, 0, sizeof(wr)); 398 399 wr.next = NULL; 400 wr.wr_id = 0; 401 wr.sg_list = NULL; 402 wr.num_sge = 0; 403 wr.opcode = IB_WR_RDMA_WRITE; 404 405 rc = ib_post_send(ssk->qp, &wr, &bad_wr); 406 if (rc) { 407 sdp_dbg(ssk->socket, 408 "ib_post_keepalive failed with status %d.\n", rc); 409 sdp_notify(ssk, ECONNRESET); 410 } 411 412 sdp_cnt(sdp_keepalive_probes_sent); 413 } 414 415 static void 416 sdp_tx_cq_event_handler(struct ib_event *event, void *data) 417 { 418 } 419 420 int 421 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device) 422 { 423 struct ib_cq_init_attr tx_cq_attr = { 424 .cqe = SDP_TX_SIZE, 425 .comp_vector = 0, 426 .flags = 0, 427 }; 428 struct ib_cq *tx_cq; 429 int rc = 0; 430 431 sdp_dbg(ssk->socket, "tx ring create\n"); 432 callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0); 433 callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0); 434 atomic_set(&ssk->tx_ring.head, 1); 435 atomic_set(&ssk->tx_ring.tail, 1); 436 437 ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE, 438 M_SDP, M_WAITOK); 439 440 tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler, 441 ssk, &tx_cq_attr); 442 if (IS_ERR(tx_cq)) { 443 rc = PTR_ERR(tx_cq); 444 sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc); 445 goto err_cq; 446 } 447 ssk->tx_ring.cq = tx_cq; 448 ssk->tx_ring.poll_cnt = 0; 449 sdp_arm_tx_cq(ssk); 450 451 return 0; 452 453 err_cq: 454 free(ssk->tx_ring.buffer, M_SDP); 455 ssk->tx_ring.buffer = NULL; 456 return rc; 457 } 458 459 void 460 sdp_tx_ring_destroy(struct sdp_sock *ssk) 461 { 462 463 sdp_dbg(ssk->socket, "tx ring destroy\n"); 464 SDP_WLOCK(ssk); 465 callout_stop(&ssk->tx_ring.timer); 466 callout_stop(&ssk->nagle_timer); 467 SDP_WUNLOCK(ssk); 468 callout_drain(&ssk->tx_ring.timer); 469 callout_drain(&ssk->nagle_timer); 470 471 if (ssk->tx_ring.buffer) { 472 sdp_tx_ring_purge(ssk); 473 free(ssk->tx_ring.buffer, M_SDP); 474 ssk->tx_ring.buffer = NULL; 475 } 476 477 if (ssk->tx_ring.cq) { 478 if (ib_destroy_cq(ssk->tx_ring.cq)) { 479 sdp_warn(ssk->socket, "destroy cq(%p) failed\n", 480 ssk->tx_ring.cq); 481 } else { 482 ssk->tx_ring.cq = NULL; 483 } 484 } 485 486 WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring)); 487 } 488