1 /* 2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/moduleparam.h> 35 #include <linux/gfp.h> 36 #include <net/sock.h> 37 #include <linux/in.h> 38 #include <linux/list.h> 39 #include <linux/ratelimit.h> 40 #include <linux/export.h> 41 #include <linux/sizes.h> 42 43 #include "rds.h" 44 45 /* When transmitting messages in rds_send_xmit, we need to emerge from 46 * time to time and briefly release the CPU. Otherwise the softlock watchdog 47 * will kick our shin. 48 * Also, it seems fairer to not let one busy connection stall all the 49 * others. 50 * 51 * send_batch_count is the number of times we'll loop in send_xmit. Setting 52 * it to 0 will restore the old behavior (where we looped until we had 53 * drained the queue). 54 */ 55 static int send_batch_count = SZ_1K; 56 module_param(send_batch_count, int, 0444); 57 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 58 59 static void rds_send_remove_from_sock(struct list_head *messages, int status); 60 61 /* 62 * Reset the send state. Callers must ensure that this doesn't race with 63 * rds_send_xmit(). 64 */ 65 void rds_send_path_reset(struct rds_conn_path *cp) 66 { 67 struct rds_message *rm, *tmp; 68 unsigned long flags; 69 70 if (cp->cp_xmit_rm) { 71 rm = cp->cp_xmit_rm; 72 cp->cp_xmit_rm = NULL; 73 /* Tell the user the RDMA op is no longer mapped by the 74 * transport. This isn't entirely true (it's flushed out 75 * independently) but as the connection is down, there's 76 * no ongoing RDMA to/from that memory */ 77 rds_message_unmapped(rm); 78 rds_message_put(rm); 79 } 80 81 cp->cp_xmit_sg = 0; 82 cp->cp_xmit_hdr_off = 0; 83 cp->cp_xmit_data_off = 0; 84 cp->cp_xmit_atomic_sent = 0; 85 cp->cp_xmit_rdma_sent = 0; 86 cp->cp_xmit_data_sent = 0; 87 88 cp->cp_conn->c_map_queued = 0; 89 90 cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; 91 cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; 92 93 /* Mark messages as retransmissions, and move them to the send q */ 94 spin_lock_irqsave(&cp->cp_lock, flags); 95 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 96 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 97 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 98 } 99 list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); 100 spin_unlock_irqrestore(&cp->cp_lock, flags); 101 } 102 EXPORT_SYMBOL_GPL(rds_send_path_reset); 103 104 static int acquire_in_xmit(struct rds_conn_path *cp) 105 { 106 return test_and_set_bit_lock(RDS_IN_XMIT, &cp->cp_flags) == 0; 107 } 108 109 static void release_in_xmit(struct rds_conn_path *cp) 110 { 111 clear_bit_unlock(RDS_IN_XMIT, &cp->cp_flags); 112 /* 113 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a 114 * hot path and finding waiters is very rare. We don't want to walk 115 * the system-wide hashed waitqueue buckets in the fast path only to 116 * almost never find waiters. 117 */ 118 if (waitqueue_active(&cp->cp_waitq)) 119 wake_up_all(&cp->cp_waitq); 120 } 121 122 /* 123 * Helper function for multipath fanout to ensure lane 0 transmits queued 124 * messages before other lanes to prevent out-of-order delivery. 125 * 126 * Returns true if lane 0 still has messages or false otherwise 127 */ 128 static bool rds_mprds_cp0_catchup(struct rds_connection *conn) 129 { 130 struct rds_conn_path *cp0 = conn->c_path; 131 struct rds_message *rm0; 132 unsigned long flags; 133 bool ret = false; 134 135 spin_lock_irqsave(&cp0->cp_lock, flags); 136 137 /* the oldest / first message in the retransmit queue 138 * has to be at or beyond c_cp0_mprds_catchup_tx_seq 139 */ 140 if (!list_empty(&cp0->cp_retrans)) { 141 rm0 = list_entry(cp0->cp_retrans.next, struct rds_message, 142 m_conn_item); 143 if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) < 144 conn->c_cp0_mprds_catchup_tx_seq) { 145 /* the retransmit queue of cp_index#0 has not 146 * quite caught up yet 147 */ 148 ret = true; 149 goto unlock; 150 } 151 } 152 153 /* the oldest / first message of the send queue 154 * has to be at or beyond c_cp0_mprds_catchup_tx_seq 155 */ 156 rm0 = cp0->cp_xmit_rm; 157 if (!rm0 && !list_empty(&cp0->cp_send_queue)) 158 rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message, 159 m_conn_item); 160 if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) < 161 conn->c_cp0_mprds_catchup_tx_seq) { 162 /* the send queue of cp_index#0 has not quite 163 * caught up yet 164 */ 165 ret = true; 166 } 167 168 unlock: 169 spin_unlock_irqrestore(&cp0->cp_lock, flags); 170 return ret; 171 } 172 173 /* 174 * We're making the conscious trade-off here to only send one message 175 * down the connection at a time. 176 * Pro: 177 * - tx queueing is a simple fifo list 178 * - reassembly is optional and easily done by transports per conn 179 * - no per flow rx lookup at all, straight to the socket 180 * - less per-frag memory and wire overhead 181 * Con: 182 * - queued acks can be delayed behind large messages 183 * Depends: 184 * - small message latency is higher behind queued large messages 185 * - large message latency isn't starved by intervening small sends 186 */ 187 int rds_send_xmit(struct rds_conn_path *cp) 188 { 189 struct rds_connection *conn = cp->cp_conn; 190 struct rds_message *rm; 191 unsigned long flags; 192 unsigned int tmp; 193 struct scatterlist *sg; 194 int ret = 0; 195 LIST_HEAD(to_be_dropped); 196 int batch_count; 197 unsigned long send_gen = 0; 198 int same_rm = 0; 199 200 restart: 201 batch_count = 0; 202 203 /* 204 * sendmsg calls here after having queued its message on the send 205 * queue. We only have one task feeding the connection at a time. If 206 * another thread is already feeding the queue then we back off. This 207 * avoids blocking the caller and trading per-connection data between 208 * caches per message. 209 */ 210 if (!acquire_in_xmit(cp)) { 211 rds_stats_inc(s_send_lock_contention); 212 ret = -ENOMEM; 213 goto out; 214 } 215 216 if (rds_destroy_pending(cp->cp_conn)) { 217 release_in_xmit(cp); 218 ret = -ENETUNREACH; /* dont requeue send work */ 219 goto out; 220 } 221 222 /* 223 * we record the send generation after doing the xmit acquire. 224 * if someone else manages to jump in and do some work, we'll use 225 * this to avoid a goto restart farther down. 226 * 227 * The acquire_in_xmit() check above ensures that only one 228 * caller can increment c_send_gen at any time. 229 */ 230 send_gen = READ_ONCE(cp->cp_send_gen) + 1; 231 WRITE_ONCE(cp->cp_send_gen, send_gen); 232 233 /* 234 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 235 * we do the opposite to avoid races. 236 */ 237 if (!rds_conn_path_up(cp)) { 238 release_in_xmit(cp); 239 ret = 0; 240 goto out; 241 } 242 243 if (conn->c_trans->xmit_path_prepare) 244 conn->c_trans->xmit_path_prepare(cp); 245 246 /* 247 * spin trying to push headers and data down the connection until 248 * the connection doesn't make forward progress. 249 */ 250 while (1) { 251 252 rm = cp->cp_xmit_rm; 253 254 if (!rm) { 255 same_rm = 0; 256 } else { 257 same_rm++; 258 if (same_rm >= 4096) { 259 rds_stats_inc(s_send_stuck_rm); 260 ret = -EAGAIN; 261 break; 262 } 263 } 264 265 /* 266 * If between sending messages, we can send a pending congestion 267 * map update. 268 */ 269 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 270 rm = rds_cong_update_alloc(conn); 271 if (IS_ERR(rm)) { 272 ret = PTR_ERR(rm); 273 break; 274 } 275 rm->data.op_active = 1; 276 rm->m_inc.i_conn_path = cp; 277 rm->m_inc.i_conn = cp->cp_conn; 278 279 cp->cp_xmit_rm = rm; 280 } 281 282 /* 283 * If not already working on one, grab the next message. 284 * 285 * cp_xmit_rm holds a ref while we're sending this message down 286 * the connection. We can use this ref while holding the 287 * send_sem.. rds_send_path_reset() is serialized with it. 288 */ 289 if (!rm) { 290 unsigned int len; 291 292 batch_count++; 293 294 /* we want to process as big a batch as we can, but 295 * we also want to avoid softlockups. If we've been 296 * through a lot of messages, lets back off and see 297 * if anyone else jumps in 298 */ 299 if (batch_count >= send_batch_count) 300 goto over_batch; 301 302 /* make sure cp_index#0 caught up during fan-out in 303 * order to avoid lane races 304 */ 305 if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) { 306 rds_stats_inc(s_mprds_catchup_tx0_retries); 307 goto over_batch; 308 } 309 310 spin_lock_irqsave(&cp->cp_lock, flags); 311 312 if (!list_empty(&cp->cp_send_queue)) { 313 rm = list_entry(cp->cp_send_queue.next, 314 struct rds_message, 315 m_conn_item); 316 rds_message_addref(rm); 317 318 /* 319 * Move the message from the send queue to the retransmit 320 * list right away. 321 */ 322 list_move_tail(&rm->m_conn_item, 323 &cp->cp_retrans); 324 } 325 326 spin_unlock_irqrestore(&cp->cp_lock, flags); 327 328 if (!rm) 329 break; 330 331 /* Unfortunately, the way Infiniband deals with 332 * RDMA to a bad MR key is by moving the entire 333 * queue pair to error state. We could possibly 334 * recover from that, but right now we drop the 335 * connection. 336 * Therefore, we never retransmit messages with RDMA ops. 337 */ 338 if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) || 339 (rm->rdma.op_active && 340 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) { 341 spin_lock_irqsave(&cp->cp_lock, flags); 342 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 343 list_move(&rm->m_conn_item, &to_be_dropped); 344 spin_unlock_irqrestore(&cp->cp_lock, flags); 345 continue; 346 } 347 348 /* Require an ACK every once in a while */ 349 len = ntohl(rm->m_inc.i_hdr.h_len); 350 if (cp->cp_unacked_packets == 0 || 351 cp->cp_unacked_bytes < len) { 352 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 353 354 cp->cp_unacked_packets = 355 rds_sysctl_max_unacked_packets; 356 cp->cp_unacked_bytes = 357 rds_sysctl_max_unacked_bytes; 358 rds_stats_inc(s_send_ack_required); 359 } else { 360 cp->cp_unacked_bytes -= len; 361 cp->cp_unacked_packets--; 362 } 363 364 cp->cp_xmit_rm = rm; 365 } 366 367 /* The transport either sends the whole rdma or none of it */ 368 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { 369 rm->m_final_op = &rm->rdma; 370 /* The transport owns the mapped memory for now. 371 * You can't unmap it while it's on the send queue 372 */ 373 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 374 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 375 if (ret) { 376 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 377 wake_up_interruptible(&rm->m_flush_wait); 378 break; 379 } 380 cp->cp_xmit_rdma_sent = 1; 381 382 } 383 384 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { 385 rm->m_final_op = &rm->atomic; 386 /* The transport owns the mapped memory for now. 387 * You can't unmap it while it's on the send queue 388 */ 389 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 390 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 391 if (ret) { 392 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 393 wake_up_interruptible(&rm->m_flush_wait); 394 break; 395 } 396 cp->cp_xmit_atomic_sent = 1; 397 398 } 399 400 /* 401 * A number of cases require an RDS header to be sent 402 * even if there is no data. 403 * We permit 0-byte sends; rds-ping depends on this. 404 * However, if there are exclusively attached silent ops, 405 * we skip the hdr/data send, to enable silent operation. 406 */ 407 if (rm->data.op_nents == 0) { 408 int ops_present; 409 int all_ops_are_silent = 1; 410 411 ops_present = (rm->atomic.op_active || rm->rdma.op_active); 412 if (rm->atomic.op_active && !rm->atomic.op_silent) 413 all_ops_are_silent = 0; 414 if (rm->rdma.op_active && !rm->rdma.op_silent) 415 all_ops_are_silent = 0; 416 417 if (ops_present && all_ops_are_silent 418 && !rm->m_rdma_cookie) 419 rm->data.op_active = 0; 420 } 421 422 if (rm->data.op_active && !cp->cp_xmit_data_sent) { 423 rm->m_final_op = &rm->data; 424 425 ret = conn->c_trans->xmit(conn, rm, 426 cp->cp_xmit_hdr_off, 427 cp->cp_xmit_sg, 428 cp->cp_xmit_data_off); 429 if (ret <= 0) 430 break; 431 432 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { 433 tmp = min_t(int, ret, 434 sizeof(struct rds_header) - 435 cp->cp_xmit_hdr_off); 436 cp->cp_xmit_hdr_off += tmp; 437 ret -= tmp; 438 } 439 440 sg = &rm->data.op_sg[cp->cp_xmit_sg]; 441 while (ret) { 442 tmp = min_t(int, ret, sg->length - 443 cp->cp_xmit_data_off); 444 cp->cp_xmit_data_off += tmp; 445 ret -= tmp; 446 if (cp->cp_xmit_data_off == sg->length) { 447 cp->cp_xmit_data_off = 0; 448 sg++; 449 cp->cp_xmit_sg++; 450 BUG_ON(ret != 0 && cp->cp_xmit_sg == 451 rm->data.op_nents); 452 } 453 } 454 455 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && 456 (cp->cp_xmit_sg == rm->data.op_nents)) 457 cp->cp_xmit_data_sent = 1; 458 } 459 460 /* 461 * A rm will only take multiple times through this loop 462 * if there is a data op. Thus, if the data is sent (or there was 463 * none), then we're done with the rm. 464 */ 465 if (!rm->data.op_active || cp->cp_xmit_data_sent) { 466 cp->cp_xmit_rm = NULL; 467 cp->cp_xmit_sg = 0; 468 cp->cp_xmit_hdr_off = 0; 469 cp->cp_xmit_data_off = 0; 470 cp->cp_xmit_rdma_sent = 0; 471 cp->cp_xmit_atomic_sent = 0; 472 cp->cp_xmit_data_sent = 0; 473 474 rds_message_put(rm); 475 } 476 } 477 478 over_batch: 479 if (conn->c_trans->xmit_path_complete) 480 conn->c_trans->xmit_path_complete(cp); 481 release_in_xmit(cp); 482 483 /* Nuke any messages we decided not to retransmit. */ 484 if (!list_empty(&to_be_dropped)) { 485 /* irqs on here, so we can put(), unlike above */ 486 list_for_each_entry(rm, &to_be_dropped, m_conn_item) 487 rds_message_put(rm); 488 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 489 } 490 491 /* 492 * Other senders can queue a message after we last test the send queue 493 * but before we clear RDS_IN_XMIT. In that case they'd back off and 494 * not try and send their newly queued message. We need to check the 495 * send queue after having cleared RDS_IN_XMIT so that their message 496 * doesn't get stuck on the send queue. 497 * 498 * If the transport cannot continue (i.e ret != 0), then it must 499 * call us when more room is available, such as from the tx 500 * completion handler. 501 * 502 * We have an extra generation check here so that if someone manages 503 * to jump in after our release_in_xmit, we'll see that they have done 504 * some work and we will skip our goto 505 */ 506 if (ret == 0) { 507 bool raced; 508 509 smp_mb(); 510 raced = send_gen != READ_ONCE(cp->cp_send_gen); 511 512 if ((test_bit(0, &conn->c_map_queued) || 513 !list_empty(&cp->cp_send_queue)) && !raced) { 514 if (batch_count < send_batch_count) 515 goto restart; 516 rcu_read_lock(); 517 if (rds_destroy_pending(cp->cp_conn)) 518 ret = -ENETUNREACH; 519 else 520 queue_delayed_work(cp->cp_wq, 521 &cp->cp_send_w, 1); 522 rcu_read_unlock(); 523 } else if (raced) { 524 rds_stats_inc(s_send_lock_queue_raced); 525 } 526 } 527 out: 528 return ret; 529 } 530 EXPORT_SYMBOL_GPL(rds_send_xmit); 531 532 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 533 { 534 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 535 536 assert_spin_locked(&rs->rs_lock); 537 538 BUG_ON(rs->rs_snd_bytes < len); 539 rs->rs_snd_bytes -= len; 540 541 if (rs->rs_snd_bytes == 0) 542 rds_stats_inc(s_send_queue_empty); 543 } 544 545 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 546 is_acked_func is_acked) 547 { 548 if (is_acked) 549 return is_acked(rm, ack); 550 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 551 } 552 553 /* 554 * This is pretty similar to what happens below in the ACK 555 * handling code - except that we call here as soon as we get 556 * the IB send completion on the RDMA op and the accompanying 557 * message. 558 */ 559 void rds_rdma_send_complete(struct rds_message *rm, int status) 560 { 561 struct rds_sock *rs = NULL; 562 struct rm_rdma_op *ro; 563 struct rds_notifier *notifier; 564 unsigned long flags; 565 566 spin_lock_irqsave(&rm->m_rs_lock, flags); 567 568 ro = &rm->rdma; 569 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 570 ro->op_active && ro->op_notify && ro->op_notifier) { 571 notifier = ro->op_notifier; 572 rs = rm->m_rs; 573 sock_hold(rds_rs_to_sk(rs)); 574 575 notifier->n_status = status; 576 spin_lock(&rs->rs_lock); 577 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 578 spin_unlock(&rs->rs_lock); 579 580 ro->op_notifier = NULL; 581 } 582 583 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 584 585 if (rs) { 586 rds_wake_sk_sleep(rs); 587 sock_put(rds_rs_to_sk(rs)); 588 } 589 } 590 EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 591 592 /* 593 * Just like above, except looks at atomic op 594 */ 595 void rds_atomic_send_complete(struct rds_message *rm, int status) 596 { 597 struct rds_sock *rs = NULL; 598 struct rm_atomic_op *ao; 599 struct rds_notifier *notifier; 600 unsigned long flags; 601 602 spin_lock_irqsave(&rm->m_rs_lock, flags); 603 604 ao = &rm->atomic; 605 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 606 && ao->op_active && ao->op_notify && ao->op_notifier) { 607 notifier = ao->op_notifier; 608 rs = rm->m_rs; 609 sock_hold(rds_rs_to_sk(rs)); 610 611 notifier->n_status = status; 612 spin_lock(&rs->rs_lock); 613 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 614 spin_unlock(&rs->rs_lock); 615 616 ao->op_notifier = NULL; 617 } 618 619 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 620 621 if (rs) { 622 rds_wake_sk_sleep(rs); 623 sock_put(rds_rs_to_sk(rs)); 624 } 625 } 626 EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 627 628 /* 629 * This is the same as rds_rdma_send_complete except we 630 * don't do any locking - we have all the ingredients (message, 631 * socket, socket lock) and can just move the notifier. 632 */ 633 static inline void 634 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 635 { 636 struct rm_rdma_op *ro; 637 struct rm_atomic_op *ao; 638 639 ro = &rm->rdma; 640 if (ro->op_active && ro->op_notify && ro->op_notifier) { 641 ro->op_notifier->n_status = status; 642 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 643 ro->op_notifier = NULL; 644 } 645 646 ao = &rm->atomic; 647 if (ao->op_active && ao->op_notify && ao->op_notifier) { 648 ao->op_notifier->n_status = status; 649 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); 650 ao->op_notifier = NULL; 651 } 652 653 /* No need to wake the app - caller does this */ 654 } 655 656 /* 657 * This removes messages from the socket's list if they're on it. The list 658 * argument must be private to the caller, we must be able to modify it 659 * without locks. The messages must have a reference held for their 660 * position on the list. This function will drop that reference after 661 * removing the messages from the 'messages' list regardless of if it found 662 * the messages on the socket list or not. 663 */ 664 static void rds_send_remove_from_sock(struct list_head *messages, int status) 665 { 666 unsigned long flags; 667 struct rds_sock *rs = NULL; 668 struct rds_message *rm; 669 670 while (!list_empty(messages)) { 671 int was_on_sock = 0; 672 673 rm = list_entry(messages->next, struct rds_message, 674 m_conn_item); 675 list_del_init(&rm->m_conn_item); 676 677 /* 678 * If we see this flag cleared then we're *sure* that someone 679 * else beat us to removing it from the sock. If we race 680 * with their flag update we'll get the lock and then really 681 * see that the flag has been cleared. 682 * 683 * The message spinlock makes sure nobody clears rm->m_rs 684 * while we're messing with it. It does not prevent the 685 * message from being removed from the socket, though. 686 */ 687 spin_lock_irqsave(&rm->m_rs_lock, flags); 688 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 689 goto unlock_and_drop; 690 691 if (rs != rm->m_rs) { 692 if (rs) { 693 rds_wake_sk_sleep(rs); 694 sock_put(rds_rs_to_sk(rs)); 695 } 696 rs = rm->m_rs; 697 if (rs) 698 sock_hold(rds_rs_to_sk(rs)); 699 } 700 if (!rs) 701 goto unlock_and_drop; 702 spin_lock(&rs->rs_lock); 703 704 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 705 struct rm_rdma_op *ro = &rm->rdma; 706 struct rds_notifier *notifier; 707 708 list_del_init(&rm->m_sock_item); 709 rds_send_sndbuf_remove(rs, rm); 710 711 if (ro->op_active && ro->op_notifier && 712 (ro->op_notify || (ro->op_recverr && status))) { 713 notifier = ro->op_notifier; 714 list_add_tail(¬ifier->n_list, 715 &rs->rs_notify_queue); 716 if (!notifier->n_status) 717 notifier->n_status = status; 718 rm->rdma.op_notifier = NULL; 719 } 720 was_on_sock = 1; 721 } 722 spin_unlock(&rs->rs_lock); 723 724 unlock_and_drop: 725 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 726 rds_message_put(rm); 727 if (was_on_sock) 728 rds_message_put(rm); 729 } 730 731 if (rs) { 732 rds_wake_sk_sleep(rs); 733 sock_put(rds_rs_to_sk(rs)); 734 } 735 } 736 737 /* 738 * Transports call here when they've determined that the receiver queued 739 * messages up to, and including, the given sequence number. Messages are 740 * moved to the retrans queue when rds_send_xmit picks them off the send 741 * queue. This means that in the TCP case, the message may not have been 742 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 743 * checks the RDS_MSG_HAS_ACK_SEQ bit. 744 */ 745 void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, 746 is_acked_func is_acked) 747 { 748 struct rds_message *rm, *tmp; 749 unsigned long flags; 750 LIST_HEAD(list); 751 752 spin_lock_irqsave(&cp->cp_lock, flags); 753 754 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 755 if (!rds_send_is_acked(rm, ack, is_acked)) 756 break; 757 758 list_move(&rm->m_conn_item, &list); 759 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 760 } 761 762 /* order flag updates with spin locks */ 763 if (!list_empty(&list)) 764 smp_mb__after_atomic(); 765 766 spin_unlock_irqrestore(&cp->cp_lock, flags); 767 768 /* now remove the messages from the sock list as needed */ 769 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 770 } 771 EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); 772 773 void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 774 is_acked_func is_acked) 775 { 776 WARN_ON(conn->c_trans->t_mp_capable); 777 rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); 778 } 779 EXPORT_SYMBOL_GPL(rds_send_drop_acked); 780 781 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) 782 { 783 struct rds_message *rm, *tmp; 784 struct rds_connection *conn; 785 struct rds_conn_path *cp; 786 unsigned long flags; 787 LIST_HEAD(list); 788 789 /* get all the messages we're dropping under the rs lock */ 790 spin_lock_irqsave(&rs->rs_lock, flags); 791 792 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 793 if (dest && 794 (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) || 795 dest->sin6_port != rm->m_inc.i_hdr.h_dport)) 796 continue; 797 798 list_move(&rm->m_sock_item, &list); 799 rds_send_sndbuf_remove(rs, rm); 800 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 801 } 802 803 /* order flag updates with the rs lock */ 804 smp_mb__after_atomic(); 805 806 spin_unlock_irqrestore(&rs->rs_lock, flags); 807 808 if (list_empty(&list)) 809 return; 810 811 /* Remove the messages from the conn */ 812 list_for_each_entry(rm, &list, m_sock_item) { 813 814 conn = rm->m_inc.i_conn; 815 if (conn->c_trans->t_mp_capable) 816 cp = rm->m_inc.i_conn_path; 817 else 818 cp = &conn->c_path[0]; 819 820 spin_lock_irqsave(&cp->cp_lock, flags); 821 /* 822 * Maybe someone else beat us to removing rm from the conn. 823 * If we race with their flag update we'll get the lock and 824 * then really see that the flag has been cleared. 825 */ 826 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 827 spin_unlock_irqrestore(&cp->cp_lock, flags); 828 continue; 829 } 830 list_del_init(&rm->m_conn_item); 831 spin_unlock_irqrestore(&cp->cp_lock, flags); 832 833 /* 834 * Couldn't grab m_rs_lock in top loop (lock ordering), 835 * but we can now. 836 */ 837 spin_lock_irqsave(&rm->m_rs_lock, flags); 838 839 spin_lock(&rs->rs_lock); 840 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 841 spin_unlock(&rs->rs_lock); 842 843 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 844 845 rds_message_put(rm); 846 } 847 848 rds_wake_sk_sleep(rs); 849 850 while (!list_empty(&list)) { 851 rm = list_entry(list.next, struct rds_message, m_sock_item); 852 list_del_init(&rm->m_sock_item); 853 rds_message_wait(rm); 854 855 /* just in case the code above skipped this message 856 * because RDS_MSG_ON_CONN wasn't set, run it again here 857 * taking m_rs_lock is the only thing that keeps us 858 * from racing with ack processing. 859 */ 860 spin_lock_irqsave(&rm->m_rs_lock, flags); 861 862 spin_lock(&rs->rs_lock); 863 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 864 spin_unlock(&rs->rs_lock); 865 866 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 867 868 rds_message_put(rm); 869 } 870 } 871 872 /* 873 * we only want this to fire once so we use the callers 'queued'. It's 874 * possible that another thread can race with us and remove the 875 * message from the flow with RDS_CANCEL_SENT_TO. 876 */ 877 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 878 struct rds_conn_path *cp, 879 struct rds_message *rm, __be16 sport, 880 __be16 dport, int *queued) 881 { 882 unsigned long flags; 883 u32 len; 884 885 if (*queued) 886 goto out; 887 888 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 889 890 /* this is the only place which holds both the socket's rs_lock 891 * and the connection's c_lock */ 892 spin_lock_irqsave(&rs->rs_lock, flags); 893 894 /* 895 * If there is a little space in sndbuf, we don't queue anything, 896 * and userspace gets -EAGAIN. But poll() indicates there's send 897 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 898 * freed up by incoming acks. So we check the *old* value of 899 * rs_snd_bytes here to allow the last msg to exceed the buffer, 900 * and poll() now knows no more data can be sent. 901 */ 902 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 903 rs->rs_snd_bytes += len; 904 905 /* let recv side know we are close to send space exhaustion. 906 * This is probably not the optimal way to do it, as this 907 * means we set the flag on *all* messages as soon as our 908 * throughput hits a certain threshold. 909 */ 910 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 911 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 912 913 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 914 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 915 rds_message_addref(rm); 916 sock_hold(rds_rs_to_sk(rs)); 917 rm->m_rs = rs; 918 919 /* The code ordering is a little weird, but we're 920 trying to minimize the time we hold c_lock */ 921 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 922 rm->m_inc.i_conn = conn; 923 rm->m_inc.i_conn_path = cp; 924 rds_message_addref(rm); 925 926 spin_lock(&cp->cp_lock); 927 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); 928 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 929 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 930 spin_unlock(&cp->cp_lock); 931 932 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 933 rm, len, rs, rs->rs_snd_bytes, 934 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 935 936 *queued = 1; 937 } 938 939 spin_unlock_irqrestore(&rs->rs_lock, flags); 940 out: 941 return *queued; 942 } 943 944 /* 945 * rds_message is getting to be quite complicated, and we'd like to allocate 946 * it all in one go. This figures out how big it needs to be up front. 947 */ 948 static int rds_rm_size(struct msghdr *msg, int num_sgs, 949 struct rds_iov_vector_arr *vct) 950 { 951 struct cmsghdr *cmsg; 952 int size = 0; 953 int cmsg_groups = 0; 954 int retval; 955 bool zcopy_cookie = false; 956 struct rds_iov_vector *iov, *tmp_iov; 957 958 if (num_sgs < 0) 959 return -EINVAL; 960 961 for_each_cmsghdr(cmsg, msg) { 962 if (!CMSG_OK(msg, cmsg)) 963 return -EINVAL; 964 965 if (cmsg->cmsg_level != SOL_RDS) 966 continue; 967 968 switch (cmsg->cmsg_type) { 969 case RDS_CMSG_RDMA_ARGS: 970 if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args))) 971 return -EINVAL; 972 if (vct->indx >= vct->len) { 973 vct->len += vct->incr; 974 tmp_iov = 975 krealloc(vct->vec, 976 vct->len * 977 sizeof(struct rds_iov_vector), 978 GFP_KERNEL); 979 if (!tmp_iov) { 980 vct->len -= vct->incr; 981 return -ENOMEM; 982 } 983 vct->vec = tmp_iov; 984 } 985 iov = &vct->vec[vct->indx]; 986 memset(iov, 0, sizeof(struct rds_iov_vector)); 987 vct->indx++; 988 cmsg_groups |= 1; 989 retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov); 990 if (retval < 0) 991 return retval; 992 size += retval; 993 994 break; 995 996 case RDS_CMSG_ZCOPY_COOKIE: 997 zcopy_cookie = true; 998 fallthrough; 999 1000 case RDS_CMSG_RDMA_DEST: 1001 case RDS_CMSG_RDMA_MAP: 1002 cmsg_groups |= 2; 1003 /* these are valid but do no add any size */ 1004 break; 1005 1006 case RDS_CMSG_ATOMIC_CSWP: 1007 case RDS_CMSG_ATOMIC_FADD: 1008 case RDS_CMSG_MASKED_ATOMIC_CSWP: 1009 case RDS_CMSG_MASKED_ATOMIC_FADD: 1010 cmsg_groups |= 1; 1011 size += sizeof(struct scatterlist); 1012 break; 1013 1014 default: 1015 return -EINVAL; 1016 } 1017 1018 } 1019 1020 if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie) 1021 return -EINVAL; 1022 1023 size += num_sgs * sizeof(struct scatterlist); 1024 1025 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 1026 if (cmsg_groups == 3) 1027 return -EINVAL; 1028 1029 return size; 1030 } 1031 1032 static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm, 1033 struct cmsghdr *cmsg) 1034 { 1035 u32 *cookie; 1036 1037 if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) || 1038 !rm->data.op_mmp_znotifier) 1039 return -EINVAL; 1040 cookie = CMSG_DATA(cmsg); 1041 rm->data.op_mmp_znotifier->z_cookie = *cookie; 1042 return 0; 1043 } 1044 1045 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 1046 struct msghdr *msg, int *allocated_mr, 1047 struct rds_iov_vector_arr *vct) 1048 { 1049 struct cmsghdr *cmsg; 1050 int ret = 0, ind = 0; 1051 1052 for_each_cmsghdr(cmsg, msg) { 1053 if (!CMSG_OK(msg, cmsg)) 1054 return -EINVAL; 1055 1056 if (cmsg->cmsg_level != SOL_RDS) 1057 continue; 1058 1059 /* As a side effect, RDMA_DEST and RDMA_MAP will set 1060 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 1061 */ 1062 switch (cmsg->cmsg_type) { 1063 case RDS_CMSG_RDMA_ARGS: 1064 if (ind >= vct->indx) 1065 return -ENOMEM; 1066 ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]); 1067 ind++; 1068 break; 1069 1070 case RDS_CMSG_RDMA_DEST: 1071 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 1072 break; 1073 1074 case RDS_CMSG_RDMA_MAP: 1075 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 1076 if (!ret) 1077 *allocated_mr = 1; 1078 else if (ret == -ENODEV) 1079 /* Accommodate the get_mr() case which can fail 1080 * if connection isn't established yet. 1081 */ 1082 ret = -EAGAIN; 1083 break; 1084 case RDS_CMSG_ATOMIC_CSWP: 1085 case RDS_CMSG_ATOMIC_FADD: 1086 case RDS_CMSG_MASKED_ATOMIC_CSWP: 1087 case RDS_CMSG_MASKED_ATOMIC_FADD: 1088 ret = rds_cmsg_atomic(rs, rm, cmsg); 1089 break; 1090 1091 case RDS_CMSG_ZCOPY_COOKIE: 1092 ret = rds_cmsg_zcopy(rs, rm, cmsg); 1093 break; 1094 1095 default: 1096 return -EINVAL; 1097 } 1098 1099 if (ret) 1100 break; 1101 } 1102 1103 return ret; 1104 } 1105 1106 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) 1107 { 1108 struct rds_rdma_args *args; 1109 struct cmsghdr *cmsg; 1110 1111 for_each_cmsghdr(cmsg, msg) { 1112 if (!CMSG_OK(msg, cmsg)) 1113 return -EINVAL; 1114 1115 if (cmsg->cmsg_level != SOL_RDS) 1116 continue; 1117 1118 if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) { 1119 if (cmsg->cmsg_len < 1120 CMSG_LEN(sizeof(struct rds_rdma_args))) 1121 return -EINVAL; 1122 args = CMSG_DATA(cmsg); 1123 *rdma_bytes += args->remote_vec.bytes; 1124 } 1125 } 1126 return 0; 1127 } 1128 1129 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) 1130 { 1131 struct sock *sk = sock->sk; 1132 struct rds_sock *rs = rds_sk_to_rs(sk); 1133 DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name); 1134 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name); 1135 __be16 dport; 1136 struct rds_message *rm = NULL; 1137 struct rds_connection *conn; 1138 int ret = 0; 1139 int queued = 0, allocated_mr = 0; 1140 int nonblock = msg->msg_flags & MSG_DONTWAIT; 1141 long timeo = sock_sndtimeo(sk, nonblock); 1142 struct rds_conn_path *cpath; 1143 struct in6_addr daddr; 1144 __u32 scope_id = 0; 1145 size_t rdma_payload_len = 0; 1146 bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) && 1147 sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY)); 1148 int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE); 1149 int namelen; 1150 struct rds_iov_vector_arr vct; 1151 int ind; 1152 1153 memset(&vct, 0, sizeof(vct)); 1154 1155 /* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */ 1156 vct.incr = 1; 1157 1158 /* Mirror Linux UDP mirror of BSD error message compatibility */ 1159 /* XXX: Perhaps MSG_MORE someday */ 1160 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) { 1161 ret = -EOPNOTSUPP; 1162 goto out; 1163 } 1164 1165 namelen = msg->msg_namelen; 1166 if (namelen != 0) { 1167 if (namelen < sizeof(*usin)) { 1168 ret = -EINVAL; 1169 goto out; 1170 } 1171 switch (usin->sin_family) { 1172 case AF_INET: 1173 if (usin->sin_addr.s_addr == htonl(INADDR_ANY) || 1174 usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) || 1175 ipv4_is_multicast(usin->sin_addr.s_addr)) { 1176 ret = -EINVAL; 1177 goto out; 1178 } 1179 ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr); 1180 dport = usin->sin_port; 1181 break; 1182 1183 #if IS_ENABLED(CONFIG_IPV6) 1184 case AF_INET6: { 1185 int addr_type; 1186 1187 if (namelen < sizeof(*sin6)) { 1188 ret = -EINVAL; 1189 goto out; 1190 } 1191 addr_type = ipv6_addr_type(&sin6->sin6_addr); 1192 if (!(addr_type & IPV6_ADDR_UNICAST)) { 1193 __be32 addr4; 1194 1195 if (!(addr_type & IPV6_ADDR_MAPPED)) { 1196 ret = -EINVAL; 1197 goto out; 1198 } 1199 1200 /* It is a mapped address. Need to do some 1201 * sanity checks. 1202 */ 1203 addr4 = sin6->sin6_addr.s6_addr32[3]; 1204 if (addr4 == htonl(INADDR_ANY) || 1205 addr4 == htonl(INADDR_BROADCAST) || 1206 ipv4_is_multicast(addr4)) { 1207 ret = -EINVAL; 1208 goto out; 1209 } 1210 } 1211 if (addr_type & IPV6_ADDR_LINKLOCAL) { 1212 if (sin6->sin6_scope_id == 0) { 1213 ret = -EINVAL; 1214 goto out; 1215 } 1216 scope_id = sin6->sin6_scope_id; 1217 } 1218 1219 daddr = sin6->sin6_addr; 1220 dport = sin6->sin6_port; 1221 break; 1222 } 1223 #endif 1224 1225 default: 1226 ret = -EINVAL; 1227 goto out; 1228 } 1229 } else { 1230 /* We only care about consistency with ->connect() */ 1231 lock_sock(sk); 1232 daddr = rs->rs_conn_addr; 1233 dport = rs->rs_conn_port; 1234 scope_id = rs->rs_bound_scope_id; 1235 release_sock(sk); 1236 } 1237 1238 lock_sock(sk); 1239 if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) { 1240 release_sock(sk); 1241 ret = -ENOTCONN; 1242 goto out; 1243 } else if (namelen != 0) { 1244 /* Cannot send to an IPv4 address using an IPv6 source 1245 * address and cannot send to an IPv6 address using an 1246 * IPv4 source address. 1247 */ 1248 if (ipv6_addr_v4mapped(&daddr) ^ 1249 ipv6_addr_v4mapped(&rs->rs_bound_addr)) { 1250 release_sock(sk); 1251 ret = -EOPNOTSUPP; 1252 goto out; 1253 } 1254 /* If the socket is already bound to a link local address, 1255 * it can only send to peers on the same link. But allow 1256 * communicating between link local and non-link local address. 1257 */ 1258 if (scope_id != rs->rs_bound_scope_id) { 1259 if (!scope_id) { 1260 scope_id = rs->rs_bound_scope_id; 1261 } else if (rs->rs_bound_scope_id) { 1262 release_sock(sk); 1263 ret = -EINVAL; 1264 goto out; 1265 } 1266 } 1267 } 1268 release_sock(sk); 1269 1270 ret = rds_rdma_bytes(msg, &rdma_payload_len); 1271 if (ret) 1272 goto out; 1273 1274 if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) { 1275 ret = -EMSGSIZE; 1276 goto out; 1277 } 1278 1279 if (payload_len > rds_sk_sndbuf(rs)) { 1280 ret = -EMSGSIZE; 1281 goto out; 1282 } 1283 1284 if (zcopy) { 1285 if (rs->rs_transport->t_type != RDS_TRANS_TCP) { 1286 ret = -EOPNOTSUPP; 1287 goto out; 1288 } 1289 num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX); 1290 } 1291 /* size of rm including all sgs */ 1292 ret = rds_rm_size(msg, num_sgs, &vct); 1293 if (ret < 0) 1294 goto out; 1295 1296 rm = rds_message_alloc(ret, GFP_KERNEL); 1297 if (!rm) { 1298 ret = -ENOMEM; 1299 goto out; 1300 } 1301 1302 /* Attach data to the rm */ 1303 if (payload_len) { 1304 rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs); 1305 if (IS_ERR(rm->data.op_sg)) { 1306 ret = PTR_ERR(rm->data.op_sg); 1307 goto out; 1308 } 1309 ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy); 1310 if (ret) 1311 goto out; 1312 } 1313 rm->data.op_active = 1; 1314 1315 rm->m_daddr = daddr; 1316 1317 /* rds_conn_create has a spinlock that runs with IRQ off. 1318 * Caching the conn in the socket helps a lot. */ 1319 if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) && 1320 rs->rs_tos == rs->rs_conn->c_tos) { 1321 conn = rs->rs_conn; 1322 } else { 1323 conn = rds_conn_create_outgoing(sock_net(sock->sk), 1324 &rs->rs_bound_addr, &daddr, 1325 rs->rs_transport, rs->rs_tos, 1326 sock->sk->sk_allocation, 1327 scope_id); 1328 if (IS_ERR(conn)) { 1329 ret = PTR_ERR(conn); 1330 goto out; 1331 } 1332 rs->rs_conn = conn; 1333 } 1334 1335 if (conn->c_trans->t_mp_capable) { 1336 /* Use c_path[0] until we learn that 1337 * the peer supports more (c_npaths > 1) 1338 */ 1339 cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)]; 1340 } else { 1341 cpath = &conn->c_path[0]; 1342 } 1343 1344 /* If we're multipath capable and path 0 is down, queue reconnect 1345 * and send a ping. This initiates the multipath handshake through 1346 * rds_send_probe(), which sends RDS_EXTHDR_NPATHS to the peer, 1347 * starting multipath capability negotiation. 1348 */ 1349 if (conn->c_trans->t_mp_capable && 1350 !rds_conn_path_up(&conn->c_path[0])) { 1351 /* Ensures that only one request is queued. And 1352 * rds_send_ping() ensures that only one ping is 1353 * outstanding. 1354 */ 1355 if (!test_and_set_bit(RDS_RECONNECT_PENDING, 1356 &conn->c_path[0].cp_flags)) 1357 queue_delayed_work(conn->c_path[0].cp_wq, 1358 &conn->c_path[0].cp_conn_w, 0); 1359 rds_send_ping(conn, 0); 1360 } 1361 1362 rm->m_conn_path = cpath; 1363 1364 /* Parse any control messages the user may have included. */ 1365 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct); 1366 if (ret) 1367 goto out; 1368 1369 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { 1370 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1371 &rm->rdma, conn->c_trans->xmit_rdma); 1372 ret = -EOPNOTSUPP; 1373 goto out; 1374 } 1375 1376 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1377 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1378 &rm->atomic, conn->c_trans->xmit_atomic); 1379 ret = -EOPNOTSUPP; 1380 goto out; 1381 } 1382 1383 if (rds_destroy_pending(conn)) { 1384 ret = -EAGAIN; 1385 goto out; 1386 } 1387 1388 if (rds_conn_path_down(cpath)) 1389 rds_check_all_paths(conn); 1390 1391 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1392 if (ret) { 1393 WRITE_ONCE(rs->rs_seen_congestion, 1); 1394 goto out; 1395 } 1396 while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, 1397 dport, &queued)) { 1398 rds_stats_inc(s_send_queue_full); 1399 1400 if (nonblock) { 1401 ret = -EAGAIN; 1402 goto out; 1403 } 1404 1405 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1406 rds_send_queue_rm(rs, conn, cpath, rm, 1407 rs->rs_bound_port, 1408 dport, 1409 &queued), 1410 timeo); 1411 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1412 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1413 continue; 1414 1415 ret = timeo; 1416 if (ret == 0) 1417 ret = -ETIMEDOUT; 1418 goto out; 1419 } 1420 1421 /* 1422 * By now we've committed to the send. We reuse rds_send_worker() 1423 * to retry sends in the rds thread if the transport asks us to. 1424 */ 1425 rds_stats_inc(s_send_queued); 1426 1427 ret = rds_send_xmit(cpath); 1428 if (ret == -ENOMEM || ret == -EAGAIN) { 1429 ret = 0; 1430 rcu_read_lock(); 1431 if (rds_destroy_pending(cpath->cp_conn)) 1432 ret = -ENETUNREACH; 1433 else 1434 queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1); 1435 rcu_read_unlock(); 1436 1437 if (ret) 1438 goto out; 1439 } 1440 1441 rds_message_put(rm); 1442 1443 for (ind = 0; ind < vct.indx; ind++) 1444 kfree(vct.vec[ind].iov); 1445 kfree(vct.vec); 1446 1447 return payload_len; 1448 1449 out: 1450 for (ind = 0; ind < vct.indx; ind++) 1451 kfree(vct.vec[ind].iov); 1452 kfree(vct.vec); 1453 1454 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1455 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1456 * or in any other way, we need to destroy the MR again */ 1457 if (allocated_mr) 1458 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1459 1460 if (rm) 1461 rds_message_put(rm); 1462 return ret; 1463 } 1464 1465 /* 1466 * send out a probe. Can be shared by rds_send_ping, 1467 * rds_send_pong, rds_send_hb. 1468 * rds_send_hb should use h_flags 1469 * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED 1470 * or 1471 * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED 1472 */ 1473 static int 1474 rds_send_probe(struct rds_conn_path *cp, __be16 sport, 1475 __be16 dport, u8 h_flags) 1476 { 1477 struct rds_message *rm; 1478 unsigned long flags; 1479 int ret = 0; 1480 1481 rm = rds_message_alloc(0, GFP_ATOMIC); 1482 if (!rm) { 1483 ret = -ENOMEM; 1484 goto out; 1485 } 1486 1487 rm->m_daddr = cp->cp_conn->c_faddr; 1488 rm->data.op_active = 1; 1489 1490 rds_conn_path_connect_if_down(cp); 1491 1492 ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); 1493 if (ret) 1494 goto out; 1495 1496 spin_lock_irqsave(&cp->cp_lock, flags); 1497 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 1498 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1499 rds_message_addref(rm); 1500 rm->m_inc.i_conn = cp->cp_conn; 1501 rm->m_inc.i_conn_path = cp; 1502 1503 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 1504 cp->cp_next_tx_seq); 1505 rm->m_inc.i_hdr.h_flags |= h_flags; 1506 cp->cp_next_tx_seq++; 1507 1508 if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) && 1509 cp->cp_conn->c_trans->t_mp_capable) { 1510 __be16 npaths = cpu_to_be16(RDS_MPATH_WORKERS); 1511 __be32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num); 1512 u8 dummy = 0; 1513 1514 rds_message_add_extension(&rm->m_inc.i_hdr, 1515 RDS_EXTHDR_NPATHS, &npaths); 1516 rds_message_add_extension(&rm->m_inc.i_hdr, 1517 RDS_EXTHDR_GEN_NUM, 1518 &my_gen_num); 1519 rds_message_add_extension(&rm->m_inc.i_hdr, 1520 RDS_EXTHDR_SPORT_IDX, 1521 &dummy); 1522 } 1523 spin_unlock_irqrestore(&cp->cp_lock, flags); 1524 1525 rds_stats_inc(s_send_queued); 1526 rds_stats_inc(s_send_pong); 1527 1528 /* schedule the send work on cp_wq */ 1529 rcu_read_lock(); 1530 if (!rds_destroy_pending(cp->cp_conn)) 1531 queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1); 1532 rcu_read_unlock(); 1533 1534 rds_message_put(rm); 1535 return 0; 1536 1537 out: 1538 if (rm) 1539 rds_message_put(rm); 1540 return ret; 1541 } 1542 1543 int 1544 rds_send_pong(struct rds_conn_path *cp, __be16 dport) 1545 { 1546 return rds_send_probe(cp, 0, dport, 0); 1547 } 1548 1549 void 1550 rds_send_ping(struct rds_connection *conn, int cp_index) 1551 { 1552 unsigned long flags; 1553 struct rds_conn_path *cp = &conn->c_path[cp_index]; 1554 1555 spin_lock_irqsave(&cp->cp_lock, flags); 1556 if (conn->c_ping_triggered) { 1557 spin_unlock_irqrestore(&cp->cp_lock, flags); 1558 return; 1559 } 1560 conn->c_ping_triggered = 1; 1561 spin_unlock_irqrestore(&cp->cp_lock, flags); 1562 rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0); 1563 } 1564 EXPORT_SYMBOL_GPL(rds_send_ping); 1565