1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Shared Memory Communications over RDMA (SMC-R) and RoCE 4 * 5 * Connection Data Control (CDC) 6 * handles flow control 7 * 8 * Copyright IBM Corp. 2016 9 * 10 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> 11 */ 12 13 #include <linux/spinlock.h> 14 15 #include "smc.h" 16 #include "smc_wr.h" 17 #include "smc_cdc.h" 18 #include "smc_tx.h" 19 #include "smc_rx.h" 20 #include "smc_close.h" 21 22 /********************************** send *************************************/ 23 24 /* handler for send/transmission completion of a CDC msg */ 25 static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, 26 struct smc_link *link, 27 enum ib_wc_status wc_status) 28 { 29 struct smc_cdc_tx_pend *cdcpend = (struct smc_cdc_tx_pend *)pnd_snd; 30 struct smc_connection *conn = cdcpend->conn; 31 struct smc_buf_desc *sndbuf_desc; 32 struct smc_sock *smc; 33 int diff; 34 35 sndbuf_desc = conn->sndbuf_desc; 36 smc = container_of(conn, struct smc_sock, conn); 37 bh_lock_sock(&smc->sk); 38 if (!wc_status && sndbuf_desc) { 39 diff = smc_curs_diff(sndbuf_desc->len, 40 &cdcpend->conn->tx_curs_fin, 41 &cdcpend->cursor); 42 /* sndbuf_space is decreased in smc_sendmsg */ 43 smp_mb__before_atomic(); 44 atomic_add(diff, &cdcpend->conn->sndbuf_space); 45 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */ 46 smp_mb__after_atomic(); 47 smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn); 48 smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor, 49 conn); 50 conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; 51 } 52 53 if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { 54 /* If user owns the sock_lock, mark the connection need sending. 55 * User context will later try to send when it release sock_lock 56 * in smc_release_cb() 57 */ 58 if (sock_owned_by_user(&smc->sk)) 59 conn->tx_in_release_sock = true; 60 else 61 smc_tx_pending(conn); 62 63 if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) 64 wake_up(&conn->cdc_pend_tx_wq); 65 } 66 WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); 67 68 smc_tx_sndbuf_nonfull(smc); 69 bh_unlock_sock(&smc->sk); 70 } 71 72 int smc_cdc_get_free_slot(struct smc_connection *conn, 73 struct smc_link *link, 74 struct smc_wr_buf **wr_buf, 75 struct smc_rdma_wr **wr_rdma_buf, 76 struct smc_cdc_tx_pend **pend) 77 { 78 int rc; 79 80 rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf, 81 wr_rdma_buf, 82 (struct smc_wr_tx_pend_priv **)pend); 83 if (conn->killed) { 84 /* abnormal termination */ 85 if (!rc) 86 smc_wr_tx_put_slot(link, 87 (struct smc_wr_tx_pend_priv *)(*pend)); 88 rc = -EPIPE; 89 } 90 return rc; 91 } 92 93 static inline void smc_cdc_add_pending_send(struct smc_connection *conn, 94 struct smc_cdc_tx_pend *pend) 95 { 96 BUILD_BUG_ON_MSG( 97 sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE, 98 "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)"); 99 BUILD_BUG_ON_MSG( 100 offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE, 101 "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()"); 102 BUILD_BUG_ON_MSG( 103 sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE, 104 "must increase SMC_WR_TX_PEND_PRIV_SIZE to at least sizeof(struct smc_cdc_tx_pend)"); 105 pend->conn = conn; 106 pend->cursor = conn->tx_curs_sent; 107 pend->p_cursor = conn->local_tx_ctrl.prod; 108 pend->ctrl_seq = conn->tx_cdc_seq; 109 } 110 111 int smc_cdc_msg_send(struct smc_connection *conn, 112 struct smc_wr_buf *wr_buf, 113 struct smc_cdc_tx_pend *pend) 114 { 115 struct smc_link *link = conn->lnk; 116 union smc_host_cursor cfed; 117 int rc; 118 119 smc_cdc_add_pending_send(conn, pend); 120 121 conn->tx_cdc_seq++; 122 conn->local_tx_ctrl.seqno = conn->tx_cdc_seq; 123 smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed); 124 125 atomic_inc(&conn->cdc_pend_tx_wr); 126 smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */ 127 128 rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend); 129 if (!rc) { 130 smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn); 131 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; 132 } else { 133 conn->tx_cdc_seq--; 134 conn->local_tx_ctrl.seqno = conn->tx_cdc_seq; 135 atomic_dec(&conn->cdc_pend_tx_wr); 136 } 137 138 return rc; 139 } 140 141 /* send a validation msg indicating the move of a conn to an other QP link */ 142 int smcr_cdc_msg_send_validation(struct smc_connection *conn, 143 struct smc_cdc_tx_pend *pend, 144 struct smc_wr_buf *wr_buf) 145 { 146 struct smc_host_cdc_msg *local = &conn->local_tx_ctrl; 147 struct smc_link *link = conn->lnk; 148 struct smc_cdc_msg *peer; 149 int rc; 150 151 peer = (struct smc_cdc_msg *)wr_buf; 152 peer->common.type = local->common.type; 153 peer->len = local->len; 154 peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */ 155 peer->token = htonl(local->token); 156 peer->prod_flags.failover_validation = 1; 157 158 /* We need to set pend->conn here to make sure smc_cdc_tx_handler() 159 * can handle properly 160 */ 161 smc_cdc_add_pending_send(conn, pend); 162 163 atomic_inc(&conn->cdc_pend_tx_wr); 164 smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */ 165 166 rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend); 167 if (unlikely(rc)) 168 atomic_dec(&conn->cdc_pend_tx_wr); 169 170 return rc; 171 } 172 173 static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn) 174 { 175 struct smc_cdc_tx_pend *pend; 176 struct smc_wr_buf *wr_buf; 177 struct smc_link *link; 178 bool again = false; 179 int rc; 180 181 again: 182 link = conn->lnk; 183 if (!smc_wr_tx_link_hold(link)) 184 return -ENOLINK; 185 rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend); 186 if (rc) 187 goto put_out; 188 189 spin_lock_bh(&conn->send_lock); 190 if (link != conn->lnk) { 191 /* link of connection changed, try again one time*/ 192 spin_unlock_bh(&conn->send_lock); 193 smc_wr_tx_put_slot(link, 194 (struct smc_wr_tx_pend_priv *)pend); 195 smc_wr_tx_link_put(link); 196 if (again) 197 return -ENOLINK; 198 again = true; 199 goto again; 200 } 201 rc = smc_cdc_msg_send(conn, wr_buf, pend); 202 spin_unlock_bh(&conn->send_lock); 203 put_out: 204 smc_wr_tx_link_put(link); 205 return rc; 206 } 207 208 int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn) 209 { 210 int rc; 211 212 if (!smc_conn_lgr_valid(conn) || 213 (conn->lgr->is_smcd && conn->lgr->peer_shutdown)) 214 return -EPIPE; 215 216 if (conn->lgr->is_smcd) { 217 spin_lock_bh(&conn->send_lock); 218 rc = smcd_cdc_msg_send(conn); 219 spin_unlock_bh(&conn->send_lock); 220 } else { 221 rc = smcr_cdc_get_slot_and_msg_send(conn); 222 } 223 224 return rc; 225 } 226 227 void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn) 228 { 229 wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr)); 230 } 231 232 /* Send a SMC-D CDC header. 233 * This increments the free space available in our send buffer. 234 * Also update the confirmed receive buffer with what was sent to the peer. 235 */ 236 int smcd_cdc_msg_send(struct smc_connection *conn) 237 { 238 struct smc_sock *smc = container_of(conn, struct smc_sock, conn); 239 union smc_host_cursor curs; 240 struct smcd_cdc_msg cdc; 241 int rc, diff; 242 243 memset(&cdc, 0, sizeof(cdc)); 244 cdc.common.type = SMC_CDC_MSG_TYPE; 245 curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs); 246 cdc.prod.wrap = curs.wrap; 247 cdc.prod.count = curs.count; 248 curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs); 249 cdc.cons.wrap = curs.wrap; 250 cdc.cons.count = curs.count; 251 cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags; 252 cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags; 253 rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1); 254 if (rc) 255 return rc; 256 smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn); 257 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; 258 /* Calculate transmitted data and increment free send buffer space */ 259 diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin, 260 &conn->tx_curs_sent); 261 /* increased by confirmed number of bytes */ 262 smp_mb__before_atomic(); 263 atomic_add(diff, &conn->sndbuf_space); 264 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */ 265 smp_mb__after_atomic(); 266 smc_curs_copy(&conn->tx_curs_fin, &conn->tx_curs_sent, conn); 267 268 smc_tx_sndbuf_nonfull(smc); 269 return rc; 270 } 271 272 /********************************* receive ***********************************/ 273 274 static inline bool smc_cdc_before(u16 seq1, u16 seq2) 275 { 276 return (s16)(seq1 - seq2) < 0; 277 } 278 279 static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc, 280 int *diff_prod) 281 { 282 struct smc_connection *conn = &smc->conn; 283 char *base; 284 285 /* new data included urgent business */ 286 smc_curs_copy(&conn->urg_curs, &conn->local_rx_ctrl.prod, conn); 287 conn->urg_state = SMC_URG_VALID; 288 if (!sock_flag(&smc->sk, SOCK_URGINLINE)) 289 /* we'll skip the urgent byte, so don't account for it */ 290 (*diff_prod)--; 291 base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off; 292 if (conn->urg_curs.count) 293 conn->urg_rx_byte = *(base + conn->urg_curs.count - 1); 294 else 295 conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1); 296 sk_send_sigurg(&smc->sk); 297 } 298 299 static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc, 300 struct smc_link *link) 301 { 302 struct smc_connection *conn = &smc->conn; 303 u16 recv_seq = ntohs(cdc->seqno); 304 s16 diff; 305 306 /* check that seqnum was seen before */ 307 diff = conn->local_rx_ctrl.seqno - recv_seq; 308 if (diff < 0) { /* diff larger than 0x7fff */ 309 /* drop connection */ 310 conn->out_of_sync = 1; /* prevent any further receives */ 311 spin_lock_bh(&conn->send_lock); 312 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; 313 conn->lnk = link; 314 spin_unlock_bh(&conn->send_lock); 315 sock_hold(&smc->sk); /* sock_put in abort_work */ 316 if (!queue_work(smc_close_wq, &conn->abort_work)) 317 sock_put(&smc->sk); 318 } 319 } 320 321 static void smc_cdc_msg_recv_action(struct smc_sock *smc, 322 struct smc_cdc_msg *cdc) 323 { 324 union smc_host_cursor cons_old, prod_old; 325 struct smc_connection *conn = &smc->conn; 326 int diff_cons, diff_prod; 327 328 smc_curs_copy(&prod_old, &conn->local_rx_ctrl.prod, conn); 329 smc_curs_copy(&cons_old, &conn->local_rx_ctrl.cons, conn); 330 smc_cdc_msg_to_host(&conn->local_rx_ctrl, cdc, conn); 331 332 diff_cons = smc_curs_diff(conn->peer_rmbe_size, &cons_old, 333 &conn->local_rx_ctrl.cons); 334 if (diff_cons) { 335 /* peer_rmbe_space is decreased during data transfer with RDMA 336 * write 337 */ 338 smp_mb__before_atomic(); 339 atomic_add(diff_cons, &conn->peer_rmbe_space); 340 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */ 341 smp_mb__after_atomic(); 342 } 343 344 diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old, 345 &conn->local_rx_ctrl.prod); 346 if (diff_prod) { 347 if (conn->local_rx_ctrl.prod_flags.urg_data_present) 348 smc_cdc_handle_urg_data_arrival(smc, &diff_prod); 349 /* bytes_to_rcv is decreased in smc_recvmsg */ 350 smp_mb__before_atomic(); 351 atomic_add(diff_prod, &conn->bytes_to_rcv); 352 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ 353 smp_mb__after_atomic(); 354 smc->sk.sk_data_ready(&smc->sk); 355 } else { 356 if (conn->local_rx_ctrl.prod_flags.write_blocked) 357 smc->sk.sk_data_ready(&smc->sk); 358 if (conn->local_rx_ctrl.prod_flags.urg_data_pending) 359 conn->urg_state = SMC_URG_NOTYET; 360 } 361 362 /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */ 363 if ((diff_cons && smc_tx_prepared_sends(conn)) || 364 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || 365 conn->local_rx_ctrl.prod_flags.urg_data_pending) { 366 if (!sock_owned_by_user(&smc->sk)) 367 smc_tx_pending(conn); 368 else 369 conn->tx_in_release_sock = true; 370 } 371 372 if (diff_cons && conn->urg_tx_pend && 373 atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { 374 /* urg data confirmed by peer, indicate we're ready for more */ 375 conn->urg_tx_pend = false; 376 smc->sk.sk_write_space(&smc->sk); 377 } 378 379 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { 380 smc->sk.sk_err = ECONNRESET; 381 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; 382 } 383 if (smc_cdc_rxed_any_close_or_senddone(conn)) { 384 smc->sk.sk_shutdown |= RCV_SHUTDOWN; 385 if (smc->clcsock && smc->clcsock->sk) 386 smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN; 387 smc_sock_set_flag(&smc->sk, SOCK_DONE); 388 sock_hold(&smc->sk); /* sock_put in close_work */ 389 if (!queue_work(smc_close_wq, &conn->close_work)) 390 sock_put(&smc->sk); 391 } 392 } 393 394 /* called under tasklet context */ 395 static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc) 396 { 397 sock_hold(&smc->sk); 398 bh_lock_sock(&smc->sk); 399 smc_cdc_msg_recv_action(smc, cdc); 400 bh_unlock_sock(&smc->sk); 401 sock_put(&smc->sk); /* no free sk in softirq-context */ 402 } 403 404 /* Schedule a tasklet for this connection. Triggered from the ISM device IRQ 405 * handler to indicate update in the DMBE. 406 * 407 * Context: 408 * - tasklet context 409 */ 410 static void smcd_cdc_rx_tsklet(struct tasklet_struct *t) 411 { 412 struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet); 413 struct smcd_cdc_msg *data_cdc; 414 struct smcd_cdc_msg cdc; 415 struct smc_sock *smc; 416 417 if (!conn || conn->killed) 418 return; 419 420 data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr; 421 smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn); 422 smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn); 423 smc = container_of(conn, struct smc_sock, conn); 424 smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc); 425 } 426 427 /* Initialize receive tasklet. Called from ISM device IRQ handler to start 428 * receiver side. 429 */ 430 void smcd_cdc_rx_init(struct smc_connection *conn) 431 { 432 tasklet_setup(&conn->rx_tsklet, smcd_cdc_rx_tsklet); 433 } 434 435 /***************************** init, exit, misc ******************************/ 436 437 static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf) 438 { 439 struct smc_link *link = (struct smc_link *)wc->qp->qp_context; 440 struct smc_cdc_msg *cdc = buf; 441 struct smc_connection *conn; 442 struct smc_link_group *lgr; 443 struct smc_sock *smc; 444 445 if (wc->byte_len < offsetof(struct smc_cdc_msg, reserved)) 446 return; /* short message */ 447 if (cdc->len != SMC_WR_TX_SIZE) 448 return; /* invalid message */ 449 450 /* lookup connection */ 451 lgr = smc_get_lgr(link); 452 read_lock_bh(&lgr->conns_lock); 453 conn = smc_lgr_find_conn(ntohl(cdc->token), lgr); 454 read_unlock_bh(&lgr->conns_lock); 455 if (!conn || conn->out_of_sync) 456 return; 457 smc = container_of(conn, struct smc_sock, conn); 458 459 if (cdc->prod_flags.failover_validation) { 460 smc_cdc_msg_validate(smc, cdc, link); 461 return; 462 } 463 if (smc_cdc_before(ntohs(cdc->seqno), 464 conn->local_rx_ctrl.seqno)) 465 /* received seqno is old */ 466 return; 467 468 smc_cdc_msg_recv(smc, cdc); 469 } 470 471 static struct smc_wr_rx_handler smc_cdc_rx_handlers[] = { 472 { 473 .handler = smc_cdc_rx_handler, 474 .type = SMC_CDC_MSG_TYPE 475 }, 476 { 477 .handler = NULL, 478 } 479 }; 480 481 int __init smc_cdc_init(void) 482 { 483 struct smc_wr_rx_handler *handler; 484 int rc = 0; 485 486 for (handler = smc_cdc_rx_handlers; handler->handler; handler++) { 487 INIT_HLIST_NODE(&handler->list); 488 rc = smc_wr_rx_register_handler(handler); 489 if (rc) 490 break; 491 } 492 return rc; 493 } 494