1 /* 2 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 */ 32 #include "sdp.h" 33 34 #define sdp_cnt(var) do { (var)++; } while (0) 35 36 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0, 37 "Total number of keepalive probes sent."); 38 39 static int sdp_process_tx_cq(struct sdp_sock *ssk); 40 static void sdp_poll_tx_timeout(void *data); 41 42 int 43 sdp_xmit_poll(struct sdp_sock *ssk, int force) 44 { 45 int wc_processed = 0; 46 47 SDP_WLOCK_ASSERT(ssk); 48 sdp_prf(ssk->socket, NULL, "%s", __func__); 49 50 /* If we don't have a pending timer, set one up to catch our recent 51 post in case the interface becomes idle */ 52 if (!callout_pending(&ssk->tx_ring.timer)) 53 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT, 54 sdp_poll_tx_timeout, ssk); 55 56 /* Poll the CQ every SDP_TX_POLL_MODER packets */ 57 if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) 58 wc_processed = sdp_process_tx_cq(ssk); 59 60 return wc_processed; 61 } 62 63 void 64 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb) 65 { 66 struct sdp_buf *tx_req; 67 struct sdp_bsdh *h; 68 unsigned long mseq; 69 struct ib_device *dev; 70 struct ib_send_wr *bad_wr; 71 struct ib_sge ibsge[SDP_MAX_SEND_SGES]; 72 struct ib_sge *sge; 73 struct ib_send_wr tx_wr = { NULL }; 74 int i, rc; 75 u64 addr; 76 77 SDPSTATS_COUNTER_MID_INC(post_send, h->mid); 78 SDPSTATS_HIST(send_size, mb->len); 79 80 if (!ssk->qp_active) { 81 m_freem(mb); 82 return; 83 } 84 85 mseq = ring_head(ssk->tx_ring); 86 h = mtod(mb, struct sdp_bsdh *); 87 ssk->tx_packets++; 88 ssk->tx_bytes += mb->m_pkthdr.len; 89 90 #ifdef SDP_ZCOPY 91 if (unlikely(h->mid == SDP_MID_SRCAVAIL)) { 92 struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb); 93 if (ssk->tx_sa != tx_sa) { 94 sdp_dbg_data(ssk->socket, "SrcAvail cancelled " 95 "before being sent!\n"); 96 WARN_ON(1); 97 m_freem(mb); 98 return; 99 } 100 TX_SRCAVAIL_STATE(mb)->mseq = mseq; 101 } 102 #endif 103 104 if (unlikely(mb->m_flags & M_URG)) 105 h->flags = SDP_OOB_PRES | SDP_OOB_PEND; 106 else 107 h->flags = 0; 108 109 mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */ 110 h->bufs = htons(rx_ring_posted(ssk)); 111 h->len = htonl(mb->m_pkthdr.len); 112 h->mseq = htonl(mseq); 113 h->mseq_ack = htonl(mseq_ack(ssk)); 114 115 sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d", 116 mid2str(h->mid), rx_ring_posted(ssk), mseq, 117 ntohl(h->mseq_ack)); 118 119 SDP_DUMP_PACKET(ssk->socket, "TX", mb, h); 120 121 tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)]; 122 tx_req->mb = mb; 123 dev = ssk->ib_device; 124 sge = &ibsge[0]; 125 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) { 126 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len, 127 DMA_TO_DEVICE); 128 /* TODO: proper error handling */ 129 BUG_ON(ib_dma_mapping_error(dev, addr)); 130 BUG_ON(i >= SDP_MAX_SEND_SGES); 131 tx_req->mapping[i] = addr; 132 sge->addr = addr; 133 sge->length = mb->m_len; 134 sge->lkey = ssk->sdp_dev->mr->lkey; 135 } 136 tx_wr.next = NULL; 137 tx_wr.wr_id = mseq | SDP_OP_SEND; 138 tx_wr.sg_list = ibsge; 139 tx_wr.num_sge = i; 140 tx_wr.opcode = IB_WR_SEND; 141 tx_wr.send_flags = IB_SEND_SIGNALED; 142 if (unlikely(tx_req->mb->m_flags & M_URG)) 143 tx_wr.send_flags |= IB_SEND_SOLICITED; 144 145 rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr); 146 if (unlikely(rc)) { 147 sdp_dbg(ssk->socket, 148 "ib_post_send failed with status %d.\n", rc); 149 150 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE); 151 152 sdp_notify(ssk, ECONNRESET); 153 m_freem(tx_req->mb); 154 return; 155 } 156 157 atomic_inc(&ssk->tx_ring.head); 158 atomic_dec(&ssk->tx_ring.credits); 159 atomic_set(&ssk->remote_credits, rx_ring_posted(ssk)); 160 161 return; 162 } 163 164 static struct mbuf * 165 sdp_send_completion(struct sdp_sock *ssk, int mseq) 166 { 167 struct ib_device *dev; 168 struct sdp_buf *tx_req; 169 struct mbuf *mb = NULL; 170 struct sdp_tx_ring *tx_ring = &ssk->tx_ring; 171 172 if (unlikely(mseq != ring_tail(*tx_ring))) { 173 printk(KERN_WARNING "Bogus send completion id %d tail %d\n", 174 mseq, ring_tail(*tx_ring)); 175 goto out; 176 } 177 178 dev = ssk->ib_device; 179 tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)]; 180 mb = tx_req->mb; 181 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE); 182 183 #ifdef SDP_ZCOPY 184 /* TODO: AIO and real zcopy code; add their context support here */ 185 if (BZCOPY_STATE(mb)) 186 BZCOPY_STATE(mb)->busy--; 187 #endif 188 189 atomic_inc(&tx_ring->tail); 190 191 out: 192 return mb; 193 } 194 195 static int 196 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) 197 { 198 struct mbuf *mb = NULL; 199 struct sdp_bsdh *h; 200 201 if (unlikely(wc->status)) { 202 if (wc->status != IB_WC_WR_FLUSH_ERR) { 203 sdp_prf(ssk->socket, mb, "Send completion with error. " 204 "Status %d", wc->status); 205 sdp_dbg_data(ssk->socket, "Send completion with error. " 206 "Status %d\n", wc->status); 207 sdp_notify(ssk, ECONNRESET); 208 } 209 } 210 211 mb = sdp_send_completion(ssk, wc->wr_id); 212 if (unlikely(!mb)) 213 return -1; 214 215 h = mtod(mb, struct sdp_bsdh *); 216 sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq)); 217 sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d", 218 mb, mb->m_pkthdr.len, ntohl(h->mseq)); 219 m_freem(mb); 220 221 return 0; 222 } 223 224 static inline void 225 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) 226 { 227 228 if (likely(wc->wr_id & SDP_OP_SEND)) { 229 sdp_handle_send_comp(ssk, wc); 230 return; 231 } 232 233 #ifdef SDP_ZCOPY 234 if (wc->wr_id & SDP_OP_RDMA) { 235 /* TODO: handle failed RDMA read cqe */ 236 237 sdp_dbg_data(ssk->socket, 238 "TX comp: RDMA read. status: %d\n", wc->status); 239 sdp_prf1(sk, NULL, "TX comp: RDMA read"); 240 241 if (!ssk->tx_ring.rdma_inflight) { 242 sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n"); 243 return; 244 } 245 246 if (!ssk->tx_ring.rdma_inflight->busy) { 247 sdp_warn(ssk->socket, 248 "ERROR: too many RDMA read completions\n"); 249 return; 250 } 251 252 /* Only last RDMA read WR is signalled. Order is guaranteed - 253 * therefore if Last RDMA read WR is completed - all other 254 * have, too */ 255 ssk->tx_ring.rdma_inflight->busy = 0; 256 sowwakeup(ssk->socket); 257 sdp_dbg_data(ssk->socket, "woke up sleepers\n"); 258 return; 259 } 260 #endif 261 262 /* Keepalive probe sent cleanup */ 263 sdp_cnt(sdp_keepalive_probes_sent); 264 265 if (likely(!wc->status)) 266 return; 267 268 sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n", 269 __func__, wc->status); 270 271 if (wc->status == IB_WC_WR_FLUSH_ERR) 272 return; 273 274 sdp_notify(ssk, ECONNRESET); 275 } 276 277 static int 278 sdp_process_tx_cq(struct sdp_sock *ssk) 279 { 280 struct ib_wc ibwc[SDP_NUM_WC]; 281 int n, i; 282 int wc_processed = 0; 283 284 SDP_WLOCK_ASSERT(ssk); 285 286 if (!ssk->tx_ring.cq) { 287 sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n"); 288 return 0; 289 } 290 291 do { 292 n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc); 293 for (i = 0; i < n; ++i) { 294 sdp_process_tx_wc(ssk, ibwc + i); 295 wc_processed++; 296 } 297 } while (n == SDP_NUM_WC); 298 299 if (wc_processed) { 300 sdp_post_sends(ssk, M_NOWAIT); 301 sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d", 302 (u32) tx_ring_posted(ssk)); 303 sowwakeup(ssk->socket); 304 } 305 306 return wc_processed; 307 } 308 309 static void 310 sdp_poll_tx(struct sdp_sock *ssk) 311 { 312 struct socket *sk = ssk->socket; 313 u32 inflight, wc_processed; 314 315 sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d", 316 (u32) tx_ring_posted(ssk), 317 ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring)); 318 319 if (unlikely(ssk->state == TCPS_CLOSED)) { 320 sdp_warn(sk, "Socket is closed\n"); 321 goto out; 322 } 323 324 wc_processed = sdp_process_tx_cq(ssk); 325 if (!wc_processed) 326 SDPSTATS_COUNTER_INC(tx_poll_miss); 327 else 328 SDPSTATS_COUNTER_INC(tx_poll_hit); 329 330 inflight = (u32) tx_ring_posted(ssk); 331 sdp_prf1(ssk->socket, NULL, "finished tx proccessing. inflight = %d", 332 inflight); 333 334 /* If there are still packets in flight and the timer has not already 335 * been scheduled by the Tx routine then schedule it here to guarantee 336 * completion processing of these packets */ 337 if (inflight) 338 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT, 339 sdp_poll_tx_timeout, ssk); 340 out: 341 #ifdef SDP_ZCOPY 342 if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) { 343 sdp_prf1(sk, NULL, "RDMA is inflight - arming irq"); 344 sdp_arm_tx_cq(ssk); 345 } 346 #endif 347 return; 348 } 349 350 static void 351 sdp_poll_tx_timeout(void *data) 352 { 353 struct sdp_sock *ssk = (struct sdp_sock *)data; 354 355 if (!callout_active(&ssk->tx_ring.timer)) 356 return; 357 callout_deactivate(&ssk->tx_ring.timer); 358 sdp_poll_tx(ssk); 359 } 360 361 static void 362 sdp_tx_irq(struct ib_cq *cq, void *cq_context) 363 { 364 struct sdp_sock *ssk; 365 366 ssk = cq_context; 367 sdp_prf1(ssk->socket, NULL, "tx irq"); 368 sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n"); 369 SDPSTATS_COUNTER_INC(tx_int_count); 370 SDP_WLOCK(ssk); 371 sdp_poll_tx(ssk); 372 SDP_WUNLOCK(ssk); 373 } 374 375 static 376 void sdp_tx_ring_purge(struct sdp_sock *ssk) 377 { 378 while (tx_ring_posted(ssk)) { 379 struct mbuf *mb; 380 mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring)); 381 if (!mb) 382 break; 383 m_freem(mb); 384 } 385 } 386 387 void 388 sdp_post_keepalive(struct sdp_sock *ssk) 389 { 390 int rc; 391 struct ib_send_wr wr, *bad_wr; 392 393 sdp_dbg(ssk->socket, "%s\n", __func__); 394 395 memset(&wr, 0, sizeof(wr)); 396 397 wr.next = NULL; 398 wr.wr_id = 0; 399 wr.sg_list = NULL; 400 wr.num_sge = 0; 401 wr.opcode = IB_WR_RDMA_WRITE; 402 403 rc = ib_post_send(ssk->qp, &wr, &bad_wr); 404 if (rc) { 405 sdp_dbg(ssk->socket, 406 "ib_post_keepalive failed with status %d.\n", rc); 407 sdp_notify(ssk, ECONNRESET); 408 } 409 410 sdp_cnt(sdp_keepalive_probes_sent); 411 } 412 413 static void 414 sdp_tx_cq_event_handler(struct ib_event *event, void *data) 415 { 416 } 417 418 int 419 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device) 420 { 421 struct ib_cq *tx_cq; 422 int rc = 0; 423 424 sdp_dbg(ssk->socket, "tx ring create\n"); 425 callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0); 426 callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0); 427 atomic_set(&ssk->tx_ring.head, 1); 428 atomic_set(&ssk->tx_ring.tail, 1); 429 430 ssk->tx_ring.buffer = kzalloc( 431 sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE, GFP_KERNEL); 432 if (!ssk->tx_ring.buffer) { 433 rc = -ENOMEM; 434 sdp_warn(ssk->socket, "Can't allocate TX Ring size %zd.\n", 435 sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE); 436 437 goto out; 438 } 439 440 tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler, 441 ssk, SDP_TX_SIZE, 0); 442 443 if (IS_ERR(tx_cq)) { 444 rc = PTR_ERR(tx_cq); 445 sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc); 446 goto err_cq; 447 } 448 ssk->tx_ring.cq = tx_cq; 449 ssk->tx_ring.poll_cnt = 0; 450 sdp_arm_tx_cq(ssk); 451 452 return 0; 453 454 err_cq: 455 kfree(ssk->tx_ring.buffer); 456 ssk->tx_ring.buffer = NULL; 457 out: 458 return rc; 459 } 460 461 void 462 sdp_tx_ring_destroy(struct sdp_sock *ssk) 463 { 464 465 sdp_dbg(ssk->socket, "tx ring destroy\n"); 466 SDP_WLOCK(ssk); 467 callout_stop(&ssk->tx_ring.timer); 468 callout_stop(&ssk->nagle_timer); 469 SDP_WUNLOCK(ssk); 470 callout_drain(&ssk->tx_ring.timer); 471 callout_drain(&ssk->nagle_timer); 472 473 if (ssk->tx_ring.buffer) { 474 sdp_tx_ring_purge(ssk); 475 476 kfree(ssk->tx_ring.buffer); 477 ssk->tx_ring.buffer = NULL; 478 } 479 480 if (ssk->tx_ring.cq) { 481 if (ib_destroy_cq(ssk->tx_ring.cq)) { 482 sdp_warn(ssk->socket, "destroy cq(%p) failed\n", 483 ssk->tx_ring.cq); 484 } else { 485 ssk->tx_ring.cq = NULL; 486 } 487 } 488 489 WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring)); 490 } 491