1 /* 2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/moduleparam.h> 35 #include <linux/gfp.h> 36 #include <net/sock.h> 37 #include <linux/in.h> 38 #include <linux/list.h> 39 #include <linux/ratelimit.h> 40 #include <linux/export.h> 41 #include <linux/sizes.h> 42 43 #include "rds.h" 44 45 /* When transmitting messages in rds_send_xmit, we need to emerge from 46 * time to time and briefly release the CPU. Otherwise the softlock watchdog 47 * will kick our shin. 48 * Also, it seems fairer to not let one busy connection stall all the 49 * others. 50 * 51 * send_batch_count is the number of times we'll loop in send_xmit. Setting 52 * it to 0 will restore the old behavior (where we looped until we had 53 * drained the queue). 54 */ 55 static int send_batch_count = SZ_1K; 56 module_param(send_batch_count, int, 0444); 57 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 58 59 static void rds_send_remove_from_sock(struct list_head *messages, int status); 60 61 /* 62 * Reset the send state. Callers must ensure that this doesn't race with 63 * rds_send_xmit(). 64 */ 65 void rds_send_path_reset(struct rds_conn_path *cp) 66 { 67 struct rds_message *rm, *tmp; 68 unsigned long flags; 69 70 if (cp->cp_xmit_rm) { 71 rm = cp->cp_xmit_rm; 72 cp->cp_xmit_rm = NULL; 73 /* Tell the user the RDMA op is no longer mapped by the 74 * transport. This isn't entirely true (it's flushed out 75 * independently) but as the connection is down, there's 76 * no ongoing RDMA to/from that memory */ 77 rds_message_unmapped(rm); 78 rds_message_put(rm); 79 } 80 81 cp->cp_xmit_sg = 0; 82 cp->cp_xmit_hdr_off = 0; 83 cp->cp_xmit_data_off = 0; 84 cp->cp_xmit_atomic_sent = 0; 85 cp->cp_xmit_rdma_sent = 0; 86 cp->cp_xmit_data_sent = 0; 87 88 cp->cp_conn->c_map_queued = 0; 89 90 cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; 91 cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; 92 93 /* Mark messages as retransmissions, and move them to the send q */ 94 spin_lock_irqsave(&cp->cp_lock, flags); 95 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 96 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 97 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 98 } 99 list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); 100 spin_unlock_irqrestore(&cp->cp_lock, flags); 101 } 102 EXPORT_SYMBOL_GPL(rds_send_path_reset); 103 104 static int acquire_in_xmit(struct rds_conn_path *cp) 105 { 106 return test_and_set_bit_lock(RDS_IN_XMIT, &cp->cp_flags) == 0; 107 } 108 109 static void release_in_xmit(struct rds_conn_path *cp) 110 { 111 clear_bit_unlock(RDS_IN_XMIT, &cp->cp_flags); 112 /* 113 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a 114 * hot path and finding waiters is very rare. We don't want to walk 115 * the system-wide hashed waitqueue buckets in the fast path only to 116 * almost never find waiters. 117 */ 118 if (waitqueue_active(&cp->cp_waitq)) 119 wake_up_all(&cp->cp_waitq); 120 } 121 122 /* 123 * Helper function for multipath fanout to ensure lane 0 transmits queued 124 * messages before other lanes to prevent out-of-order delivery. 125 * 126 * Returns true if lane 0 still has messages or false otherwise 127 */ 128 static bool rds_mprds_cp0_catchup(struct rds_connection *conn) 129 { 130 struct rds_conn_path *cp0 = conn->c_path; 131 struct rds_message *rm0; 132 unsigned long flags; 133 bool ret = false; 134 135 spin_lock_irqsave(&cp0->cp_lock, flags); 136 137 /* the oldest / first message in the retransmit queue 138 * has to be at or beyond c_cp0_mprds_catchup_tx_seq 139 */ 140 if (!list_empty(&cp0->cp_retrans)) { 141 rm0 = list_entry(cp0->cp_retrans.next, struct rds_message, 142 m_conn_item); 143 if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) < 144 conn->c_cp0_mprds_catchup_tx_seq) { 145 /* the retransmit queue of cp_index#0 has not 146 * quite caught up yet 147 */ 148 ret = true; 149 goto unlock; 150 } 151 } 152 153 /* the oldest / first message of the send queue 154 * has to be at or beyond c_cp0_mprds_catchup_tx_seq 155 */ 156 rm0 = cp0->cp_xmit_rm; 157 if (!rm0 && !list_empty(&cp0->cp_send_queue)) 158 rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message, 159 m_conn_item); 160 if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) < 161 conn->c_cp0_mprds_catchup_tx_seq) { 162 /* the send queue of cp_index#0 has not quite 163 * caught up yet 164 */ 165 ret = true; 166 } 167 168 unlock: 169 spin_unlock_irqrestore(&cp0->cp_lock, flags); 170 return ret; 171 } 172 173 /* 174 * We're making the conscious trade-off here to only send one message 175 * down the connection at a time. 176 * Pro: 177 * - tx queueing is a simple fifo list 178 * - reassembly is optional and easily done by transports per conn 179 * - no per flow rx lookup at all, straight to the socket 180 * - less per-frag memory and wire overhead 181 * Con: 182 * - queued acks can be delayed behind large messages 183 * Depends: 184 * - small message latency is higher behind queued large messages 185 * - large message latency isn't starved by intervening small sends 186 */ 187 int rds_send_xmit(struct rds_conn_path *cp) 188 { 189 struct rds_connection *conn = cp->cp_conn; 190 struct rds_message *rm; 191 unsigned long flags; 192 unsigned int tmp; 193 struct scatterlist *sg; 194 int ret = 0; 195 LIST_HEAD(to_be_dropped); 196 int batch_count; 197 unsigned long send_gen = 0; 198 int same_rm = 0; 199 200 restart: 201 batch_count = 0; 202 203 /* 204 * sendmsg calls here after having queued its message on the send 205 * queue. We only have one task feeding the connection at a time. If 206 * another thread is already feeding the queue then we back off. This 207 * avoids blocking the caller and trading per-connection data between 208 * caches per message. 209 */ 210 if (!acquire_in_xmit(cp)) { 211 rds_stats_inc(s_send_lock_contention); 212 ret = -ENOMEM; 213 goto out; 214 } 215 216 if (rds_destroy_pending(cp->cp_conn)) { 217 release_in_xmit(cp); 218 ret = -ENETUNREACH; /* dont requeue send work */ 219 goto out; 220 } 221 222 /* 223 * we record the send generation after doing the xmit acquire. 224 * if someone else manages to jump in and do some work, we'll use 225 * this to avoid a goto restart farther down. 226 * 227 * The acquire_in_xmit() check above ensures that only one 228 * caller can increment c_send_gen at any time. 229 */ 230 send_gen = READ_ONCE(cp->cp_send_gen) + 1; 231 WRITE_ONCE(cp->cp_send_gen, send_gen); 232 233 /* 234 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 235 * we do the opposite to avoid races. 236 */ 237 if (!rds_conn_path_up(cp)) { 238 release_in_xmit(cp); 239 ret = 0; 240 goto out; 241 } 242 243 if (conn->c_trans->xmit_path_prepare) 244 conn->c_trans->xmit_path_prepare(cp); 245 246 /* 247 * spin trying to push headers and data down the connection until 248 * the connection doesn't make forward progress. 249 */ 250 while (1) { 251 252 rm = cp->cp_xmit_rm; 253 254 if (!rm) { 255 same_rm = 0; 256 } else { 257 same_rm++; 258 if (same_rm >= 4096) { 259 rds_stats_inc(s_send_stuck_rm); 260 ret = -EAGAIN; 261 break; 262 } 263 } 264 265 /* 266 * If between sending messages, we can send a pending congestion 267 * map update. 268 */ 269 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 270 rm = rds_cong_update_alloc(conn); 271 if (IS_ERR(rm)) { 272 ret = PTR_ERR(rm); 273 break; 274 } 275 rm->data.op_active = 1; 276 rm->m_inc.i_conn_path = cp; 277 rm->m_inc.i_conn = cp->cp_conn; 278 279 cp->cp_xmit_rm = rm; 280 } 281 282 /* 283 * If not already working on one, grab the next message. 284 * 285 * cp_xmit_rm holds a ref while we're sending this message down 286 * the connection. We can use this ref while holding the 287 * send_sem.. rds_send_reset() is serialized with it. 288 */ 289 if (!rm) { 290 unsigned int len; 291 292 batch_count++; 293 294 /* we want to process as big a batch as we can, but 295 * we also want to avoid softlockups. If we've been 296 * through a lot of messages, lets back off and see 297 * if anyone else jumps in 298 */ 299 if (batch_count >= send_batch_count) 300 goto over_batch; 301 302 /* make sure cp_index#0 caught up during fan-out in 303 * order to avoid lane races 304 */ 305 if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) { 306 rds_stats_inc(s_mprds_catchup_tx0_retries); 307 goto over_batch; 308 } 309 310 spin_lock_irqsave(&cp->cp_lock, flags); 311 312 if (!list_empty(&cp->cp_send_queue)) { 313 rm = list_entry(cp->cp_send_queue.next, 314 struct rds_message, 315 m_conn_item); 316 rds_message_addref(rm); 317 318 /* 319 * Move the message from the send queue to the retransmit 320 * list right away. 321 */ 322 list_move_tail(&rm->m_conn_item, 323 &cp->cp_retrans); 324 } 325 326 spin_unlock_irqrestore(&cp->cp_lock, flags); 327 328 if (!rm) 329 break; 330 331 /* Unfortunately, the way Infiniband deals with 332 * RDMA to a bad MR key is by moving the entire 333 * queue pair to error state. We could possibly 334 * recover from that, but right now we drop the 335 * connection. 336 * Therefore, we never retransmit messages with RDMA ops. 337 */ 338 if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) || 339 (rm->rdma.op_active && 340 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) { 341 spin_lock_irqsave(&cp->cp_lock, flags); 342 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 343 list_move(&rm->m_conn_item, &to_be_dropped); 344 spin_unlock_irqrestore(&cp->cp_lock, flags); 345 continue; 346 } 347 348 /* Require an ACK every once in a while */ 349 len = ntohl(rm->m_inc.i_hdr.h_len); 350 if (cp->cp_unacked_packets == 0 || 351 cp->cp_unacked_bytes < len) { 352 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 353 354 cp->cp_unacked_packets = 355 rds_sysctl_max_unacked_packets; 356 cp->cp_unacked_bytes = 357 rds_sysctl_max_unacked_bytes; 358 rds_stats_inc(s_send_ack_required); 359 } else { 360 cp->cp_unacked_bytes -= len; 361 cp->cp_unacked_packets--; 362 } 363 364 cp->cp_xmit_rm = rm; 365 } 366 367 /* The transport either sends the whole rdma or none of it */ 368 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { 369 rm->m_final_op = &rm->rdma; 370 /* The transport owns the mapped memory for now. 371 * You can't unmap it while it's on the send queue 372 */ 373 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 374 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 375 if (ret) { 376 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 377 wake_up_interruptible(&rm->m_flush_wait); 378 break; 379 } 380 cp->cp_xmit_rdma_sent = 1; 381 382 } 383 384 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { 385 rm->m_final_op = &rm->atomic; 386 /* The transport owns the mapped memory for now. 387 * You can't unmap it while it's on the send queue 388 */ 389 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 390 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 391 if (ret) { 392 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 393 wake_up_interruptible(&rm->m_flush_wait); 394 break; 395 } 396 cp->cp_xmit_atomic_sent = 1; 397 398 } 399 400 /* 401 * A number of cases require an RDS header to be sent 402 * even if there is no data. 403 * We permit 0-byte sends; rds-ping depends on this. 404 * However, if there are exclusively attached silent ops, 405 * we skip the hdr/data send, to enable silent operation. 406 */ 407 if (rm->data.op_nents == 0) { 408 int ops_present; 409 int all_ops_are_silent = 1; 410 411 ops_present = (rm->atomic.op_active || rm->rdma.op_active); 412 if (rm->atomic.op_active && !rm->atomic.op_silent) 413 all_ops_are_silent = 0; 414 if (rm->rdma.op_active && !rm->rdma.op_silent) 415 all_ops_are_silent = 0; 416 417 if (ops_present && all_ops_are_silent 418 && !rm->m_rdma_cookie) 419 rm->data.op_active = 0; 420 } 421 422 if (rm->data.op_active && !cp->cp_xmit_data_sent) { 423 rm->m_final_op = &rm->data; 424 425 ret = conn->c_trans->xmit(conn, rm, 426 cp->cp_xmit_hdr_off, 427 cp->cp_xmit_sg, 428 cp->cp_xmit_data_off); 429 if (ret <= 0) 430 break; 431 432 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { 433 tmp = min_t(int, ret, 434 sizeof(struct rds_header) - 435 cp->cp_xmit_hdr_off); 436 cp->cp_xmit_hdr_off += tmp; 437 ret -= tmp; 438 } 439 440 sg = &rm->data.op_sg[cp->cp_xmit_sg]; 441 while (ret) { 442 tmp = min_t(int, ret, sg->length - 443 cp->cp_xmit_data_off); 444 cp->cp_xmit_data_off += tmp; 445 ret -= tmp; 446 if (cp->cp_xmit_data_off == sg->length) { 447 cp->cp_xmit_data_off = 0; 448 sg++; 449 cp->cp_xmit_sg++; 450 BUG_ON(ret != 0 && cp->cp_xmit_sg == 451 rm->data.op_nents); 452 } 453 } 454 455 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && 456 (cp->cp_xmit_sg == rm->data.op_nents)) 457 cp->cp_xmit_data_sent = 1; 458 } 459 460 /* 461 * A rm will only take multiple times through this loop 462 * if there is a data op. Thus, if the data is sent (or there was 463 * none), then we're done with the rm. 464 */ 465 if (!rm->data.op_active || cp->cp_xmit_data_sent) { 466 cp->cp_xmit_rm = NULL; 467 cp->cp_xmit_sg = 0; 468 cp->cp_xmit_hdr_off = 0; 469 cp->cp_xmit_data_off = 0; 470 cp->cp_xmit_rdma_sent = 0; 471 cp->cp_xmit_atomic_sent = 0; 472 cp->cp_xmit_data_sent = 0; 473 474 rds_message_put(rm); 475 } 476 } 477 478 over_batch: 479 if (conn->c_trans->xmit_path_complete) 480 conn->c_trans->xmit_path_complete(cp); 481 release_in_xmit(cp); 482 483 /* Nuke any messages we decided not to retransmit. */ 484 if (!list_empty(&to_be_dropped)) { 485 /* irqs on here, so we can put(), unlike above */ 486 list_for_each_entry(rm, &to_be_dropped, m_conn_item) 487 rds_message_put(rm); 488 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 489 } 490 491 /* 492 * Other senders can queue a message after we last test the send queue 493 * but before we clear RDS_IN_XMIT. In that case they'd back off and 494 * not try and send their newly queued message. We need to check the 495 * send queue after having cleared RDS_IN_XMIT so that their message 496 * doesn't get stuck on the send queue. 497 * 498 * If the transport cannot continue (i.e ret != 0), then it must 499 * call us when more room is available, such as from the tx 500 * completion handler. 501 * 502 * We have an extra generation check here so that if someone manages 503 * to jump in after our release_in_xmit, we'll see that they have done 504 * some work and we will skip our goto 505 */ 506 if (ret == 0) { 507 bool raced; 508 509 smp_mb(); 510 raced = send_gen != READ_ONCE(cp->cp_send_gen); 511 512 if ((test_bit(0, &conn->c_map_queued) || 513 !list_empty(&cp->cp_send_queue)) && !raced) { 514 if (batch_count < send_batch_count) 515 goto restart; 516 rcu_read_lock(); 517 if (rds_destroy_pending(cp->cp_conn)) 518 ret = -ENETUNREACH; 519 else 520 queue_delayed_work(cp->cp_wq, 521 &cp->cp_send_w, 1); 522 rcu_read_unlock(); 523 } else if (raced) { 524 rds_stats_inc(s_send_lock_queue_raced); 525 } 526 } 527 out: 528 return ret; 529 } 530 EXPORT_SYMBOL_GPL(rds_send_xmit); 531 532 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 533 { 534 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 535 536 assert_spin_locked(&rs->rs_lock); 537 538 BUG_ON(rs->rs_snd_bytes < len); 539 rs->rs_snd_bytes -= len; 540 541 if (rs->rs_snd_bytes == 0) 542 rds_stats_inc(s_send_queue_empty); 543 } 544 545 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 546 is_acked_func is_acked) 547 { 548 if (is_acked) 549 return is_acked(rm, ack); 550 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 551 } 552 553 /* 554 * This is pretty similar to what happens below in the ACK 555 * handling code - except that we call here as soon as we get 556 * the IB send completion on the RDMA op and the accompanying 557 * message. 558 */ 559 void rds_rdma_send_complete(struct rds_message *rm, int status) 560 { 561 struct rds_sock *rs = NULL; 562 struct rm_rdma_op *ro; 563 struct rds_notifier *notifier; 564 unsigned long flags; 565 566 spin_lock_irqsave(&rm->m_rs_lock, flags); 567 568 ro = &rm->rdma; 569 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 570 ro->op_active && ro->op_notify && ro->op_notifier) { 571 notifier = ro->op_notifier; 572 rs = rm->m_rs; 573 sock_hold(rds_rs_to_sk(rs)); 574 575 notifier->n_status = status; 576 spin_lock(&rs->rs_lock); 577 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 578 spin_unlock(&rs->rs_lock); 579 580 ro->op_notifier = NULL; 581 } 582 583 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 584 585 if (rs) { 586 rds_wake_sk_sleep(rs); 587 sock_put(rds_rs_to_sk(rs)); 588 } 589 } 590 EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 591 592 /* 593 * Just like above, except looks at atomic op 594 */ 595 void rds_atomic_send_complete(struct rds_message *rm, int status) 596 { 597 struct rds_sock *rs = NULL; 598 struct rm_atomic_op *ao; 599 struct rds_notifier *notifier; 600 unsigned long flags; 601 602 spin_lock_irqsave(&rm->m_rs_lock, flags); 603 604 ao = &rm->atomic; 605 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 606 && ao->op_active && ao->op_notify && ao->op_notifier) { 607 notifier = ao->op_notifier; 608 rs = rm->m_rs; 609 sock_hold(rds_rs_to_sk(rs)); 610 611 notifier->n_status = status; 612 spin_lock(&rs->rs_lock); 613 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 614 spin_unlock(&rs->rs_lock); 615 616 ao->op_notifier = NULL; 617 } 618 619 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 620 621 if (rs) { 622 rds_wake_sk_sleep(rs); 623 sock_put(rds_rs_to_sk(rs)); 624 } 625 } 626 EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 627 628 /* 629 * This is the same as rds_rdma_send_complete except we 630 * don't do any locking - we have all the ingredients (message, 631 * socket, socket lock) and can just move the notifier. 632 */ 633 static inline void 634 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 635 { 636 struct rm_rdma_op *ro; 637 struct rm_atomic_op *ao; 638 639 ro = &rm->rdma; 640 if (ro->op_active && ro->op_notify && ro->op_notifier) { 641 ro->op_notifier->n_status = status; 642 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 643 ro->op_notifier = NULL; 644 } 645 646 ao = &rm->atomic; 647 if (ao->op_active && ao->op_notify && ao->op_notifier) { 648 ao->op_notifier->n_status = status; 649 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); 650 ao->op_notifier = NULL; 651 } 652 653 /* No need to wake the app - caller does this */ 654 } 655 656 /* 657 * This removes messages from the socket's list if they're on it. The list 658 * argument must be private to the caller, we must be able to modify it 659 * without locks. The messages must have a reference held for their 660 * position on the list. This function will drop that reference after 661 * removing the messages from the 'messages' list regardless of if it found 662 * the messages on the socket list or not. 663 */ 664 static void rds_send_remove_from_sock(struct list_head *messages, int status) 665 { 666 unsigned long flags; 667 struct rds_sock *rs = NULL; 668 struct rds_message *rm; 669 670 while (!list_empty(messages)) { 671 int was_on_sock = 0; 672 673 rm = list_entry(messages->next, struct rds_message, 674 m_conn_item); 675 list_del_init(&rm->m_conn_item); 676 677 /* 678 * If we see this flag cleared then we're *sure* that someone 679 * else beat us to removing it from the sock. If we race 680 * with their flag update we'll get the lock and then really 681 * see that the flag has been cleared. 682 * 683 * The message spinlock makes sure nobody clears rm->m_rs 684 * while we're messing with it. It does not prevent the 685 * message from being removed from the socket, though. 686 */ 687 spin_lock_irqsave(&rm->m_rs_lock, flags); 688 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 689 goto unlock_and_drop; 690 691 if (rs != rm->m_rs) { 692 if (rs) { 693 rds_wake_sk_sleep(rs); 694 sock_put(rds_rs_to_sk(rs)); 695 } 696 rs = rm->m_rs; 697 if (rs) 698 sock_hold(rds_rs_to_sk(rs)); 699 } 700 if (!rs) 701 goto unlock_and_drop; 702 spin_lock(&rs->rs_lock); 703 704 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 705 struct rm_rdma_op *ro = &rm->rdma; 706 struct rds_notifier *notifier; 707 708 list_del_init(&rm->m_sock_item); 709 rds_send_sndbuf_remove(rs, rm); 710 711 if (ro->op_active && ro->op_notifier && 712 (ro->op_notify || (ro->op_recverr && status))) { 713 notifier = ro->op_notifier; 714 list_add_tail(¬ifier->n_list, 715 &rs->rs_notify_queue); 716 if (!notifier->n_status) 717 notifier->n_status = status; 718 rm->rdma.op_notifier = NULL; 719 } 720 was_on_sock = 1; 721 } 722 spin_unlock(&rs->rs_lock); 723 724 unlock_and_drop: 725 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 726 rds_message_put(rm); 727 if (was_on_sock) 728 rds_message_put(rm); 729 } 730 731 if (rs) { 732 rds_wake_sk_sleep(rs); 733 sock_put(rds_rs_to_sk(rs)); 734 } 735 } 736 737 /* 738 * Transports call here when they've determined that the receiver queued 739 * messages up to, and including, the given sequence number. Messages are 740 * moved to the retrans queue when rds_send_xmit picks them off the send 741 * queue. This means that in the TCP case, the message may not have been 742 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 743 * checks the RDS_MSG_HAS_ACK_SEQ bit. 744 */ 745 void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, 746 is_acked_func is_acked) 747 { 748 struct rds_message *rm, *tmp; 749 unsigned long flags; 750 LIST_HEAD(list); 751 752 spin_lock_irqsave(&cp->cp_lock, flags); 753 754 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 755 if (!rds_send_is_acked(rm, ack, is_acked)) 756 break; 757 758 list_move(&rm->m_conn_item, &list); 759 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 760 } 761 762 /* order flag updates with spin locks */ 763 if (!list_empty(&list)) 764 smp_mb__after_atomic(); 765 766 spin_unlock_irqrestore(&cp->cp_lock, flags); 767 768 /* now remove the messages from the sock list as needed */ 769 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 770 } 771 EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); 772 773 void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 774 is_acked_func is_acked) 775 { 776 WARN_ON(conn->c_trans->t_mp_capable); 777 rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); 778 } 779 EXPORT_SYMBOL_GPL(rds_send_drop_acked); 780 781 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) 782 { 783 struct rds_message *rm, *tmp; 784 struct rds_connection *conn; 785 struct rds_conn_path *cp; 786 unsigned long flags; 787 LIST_HEAD(list); 788 789 /* get all the messages we're dropping under the rs lock */ 790 spin_lock_irqsave(&rs->rs_lock, flags); 791 792 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 793 if (dest && 794 (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) || 795 dest->sin6_port != rm->m_inc.i_hdr.h_dport)) 796 continue; 797 798 list_move(&rm->m_sock_item, &list); 799 rds_send_sndbuf_remove(rs, rm); 800 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 801 } 802 803 /* order flag updates with the rs lock */ 804 smp_mb__after_atomic(); 805 806 spin_unlock_irqrestore(&rs->rs_lock, flags); 807 808 if (list_empty(&list)) 809 return; 810 811 /* Remove the messages from the conn */ 812 list_for_each_entry(rm, &list, m_sock_item) { 813 814 conn = rm->m_inc.i_conn; 815 if (conn->c_trans->t_mp_capable) 816 cp = rm->m_inc.i_conn_path; 817 else 818 cp = &conn->c_path[0]; 819 820 spin_lock_irqsave(&cp->cp_lock, flags); 821 /* 822 * Maybe someone else beat us to removing rm from the conn. 823 * If we race with their flag update we'll get the lock and 824 * then really see that the flag has been cleared. 825 */ 826 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 827 spin_unlock_irqrestore(&cp->cp_lock, flags); 828 continue; 829 } 830 list_del_init(&rm->m_conn_item); 831 spin_unlock_irqrestore(&cp->cp_lock, flags); 832 833 /* 834 * Couldn't grab m_rs_lock in top loop (lock ordering), 835 * but we can now. 836 */ 837 spin_lock_irqsave(&rm->m_rs_lock, flags); 838 839 spin_lock(&rs->rs_lock); 840 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 841 spin_unlock(&rs->rs_lock); 842 843 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 844 845 rds_message_put(rm); 846 } 847 848 rds_wake_sk_sleep(rs); 849 850 while (!list_empty(&list)) { 851 rm = list_entry(list.next, struct rds_message, m_sock_item); 852 list_del_init(&rm->m_sock_item); 853 rds_message_wait(rm); 854 855 /* just in case the code above skipped this message 856 * because RDS_MSG_ON_CONN wasn't set, run it again here 857 * taking m_rs_lock is the only thing that keeps us 858 * from racing with ack processing. 859 */ 860 spin_lock_irqsave(&rm->m_rs_lock, flags); 861 862 spin_lock(&rs->rs_lock); 863 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 864 spin_unlock(&rs->rs_lock); 865 866 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 867 868 rds_message_put(rm); 869 } 870 } 871 872 /* 873 * we only want this to fire once so we use the callers 'queued'. It's 874 * possible that another thread can race with us and remove the 875 * message from the flow with RDS_CANCEL_SENT_TO. 876 */ 877 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 878 struct rds_conn_path *cp, 879 struct rds_message *rm, __be16 sport, 880 __be16 dport, int *queued) 881 { 882 unsigned long flags; 883 u32 len; 884 885 if (*queued) 886 goto out; 887 888 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 889 890 /* this is the only place which holds both the socket's rs_lock 891 * and the connection's c_lock */ 892 spin_lock_irqsave(&rs->rs_lock, flags); 893 894 /* 895 * If there is a little space in sndbuf, we don't queue anything, 896 * and userspace gets -EAGAIN. But poll() indicates there's send 897 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 898 * freed up by incoming acks. So we check the *old* value of 899 * rs_snd_bytes here to allow the last msg to exceed the buffer, 900 * and poll() now knows no more data can be sent. 901 */ 902 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 903 rs->rs_snd_bytes += len; 904 905 /* let recv side know we are close to send space exhaustion. 906 * This is probably not the optimal way to do it, as this 907 * means we set the flag on *all* messages as soon as our 908 * throughput hits a certain threshold. 909 */ 910 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 911 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 912 913 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 914 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 915 rds_message_addref(rm); 916 sock_hold(rds_rs_to_sk(rs)); 917 rm->m_rs = rs; 918 919 /* The code ordering is a little weird, but we're 920 trying to minimize the time we hold c_lock */ 921 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 922 rm->m_inc.i_conn = conn; 923 rm->m_inc.i_conn_path = cp; 924 rds_message_addref(rm); 925 926 spin_lock(&cp->cp_lock); 927 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); 928 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 929 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 930 spin_unlock(&cp->cp_lock); 931 932 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 933 rm, len, rs, rs->rs_snd_bytes, 934 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 935 936 *queued = 1; 937 } 938 939 spin_unlock_irqrestore(&rs->rs_lock, flags); 940 out: 941 return *queued; 942 } 943 944 /* 945 * rds_message is getting to be quite complicated, and we'd like to allocate 946 * it all in one go. This figures out how big it needs to be up front. 947 */ 948 static int rds_rm_size(struct msghdr *msg, int num_sgs, 949 struct rds_iov_vector_arr *vct) 950 { 951 struct cmsghdr *cmsg; 952 int size = 0; 953 int cmsg_groups = 0; 954 int retval; 955 bool zcopy_cookie = false; 956 struct rds_iov_vector *iov, *tmp_iov; 957 958 if (num_sgs < 0) 959 return -EINVAL; 960 961 for_each_cmsghdr(cmsg, msg) { 962 if (!CMSG_OK(msg, cmsg)) 963 return -EINVAL; 964 965 if (cmsg->cmsg_level != SOL_RDS) 966 continue; 967 968 switch (cmsg->cmsg_type) { 969 case RDS_CMSG_RDMA_ARGS: 970 if (vct->indx >= vct->len) { 971 vct->len += vct->incr; 972 tmp_iov = 973 krealloc(vct->vec, 974 vct->len * 975 sizeof(struct rds_iov_vector), 976 GFP_KERNEL); 977 if (!tmp_iov) { 978 vct->len -= vct->incr; 979 return -ENOMEM; 980 } 981 vct->vec = tmp_iov; 982 } 983 iov = &vct->vec[vct->indx]; 984 memset(iov, 0, sizeof(struct rds_iov_vector)); 985 vct->indx++; 986 cmsg_groups |= 1; 987 retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov); 988 if (retval < 0) 989 return retval; 990 size += retval; 991 992 break; 993 994 case RDS_CMSG_ZCOPY_COOKIE: 995 zcopy_cookie = true; 996 fallthrough; 997 998 case RDS_CMSG_RDMA_DEST: 999 case RDS_CMSG_RDMA_MAP: 1000 cmsg_groups |= 2; 1001 /* these are valid but do no add any size */ 1002 break; 1003 1004 case RDS_CMSG_ATOMIC_CSWP: 1005 case RDS_CMSG_ATOMIC_FADD: 1006 case RDS_CMSG_MASKED_ATOMIC_CSWP: 1007 case RDS_CMSG_MASKED_ATOMIC_FADD: 1008 cmsg_groups |= 1; 1009 size += sizeof(struct scatterlist); 1010 break; 1011 1012 default: 1013 return -EINVAL; 1014 } 1015 1016 } 1017 1018 if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie) 1019 return -EINVAL; 1020 1021 size += num_sgs * sizeof(struct scatterlist); 1022 1023 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 1024 if (cmsg_groups == 3) 1025 return -EINVAL; 1026 1027 return size; 1028 } 1029 1030 static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm, 1031 struct cmsghdr *cmsg) 1032 { 1033 u32 *cookie; 1034 1035 if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) || 1036 !rm->data.op_mmp_znotifier) 1037 return -EINVAL; 1038 cookie = CMSG_DATA(cmsg); 1039 rm->data.op_mmp_znotifier->z_cookie = *cookie; 1040 return 0; 1041 } 1042 1043 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 1044 struct msghdr *msg, int *allocated_mr, 1045 struct rds_iov_vector_arr *vct) 1046 { 1047 struct cmsghdr *cmsg; 1048 int ret = 0, ind = 0; 1049 1050 for_each_cmsghdr(cmsg, msg) { 1051 if (!CMSG_OK(msg, cmsg)) 1052 return -EINVAL; 1053 1054 if (cmsg->cmsg_level != SOL_RDS) 1055 continue; 1056 1057 /* As a side effect, RDMA_DEST and RDMA_MAP will set 1058 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 1059 */ 1060 switch (cmsg->cmsg_type) { 1061 case RDS_CMSG_RDMA_ARGS: 1062 if (ind >= vct->indx) 1063 return -ENOMEM; 1064 ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]); 1065 ind++; 1066 break; 1067 1068 case RDS_CMSG_RDMA_DEST: 1069 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 1070 break; 1071 1072 case RDS_CMSG_RDMA_MAP: 1073 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 1074 if (!ret) 1075 *allocated_mr = 1; 1076 else if (ret == -ENODEV) 1077 /* Accommodate the get_mr() case which can fail 1078 * if connection isn't established yet. 1079 */ 1080 ret = -EAGAIN; 1081 break; 1082 case RDS_CMSG_ATOMIC_CSWP: 1083 case RDS_CMSG_ATOMIC_FADD: 1084 case RDS_CMSG_MASKED_ATOMIC_CSWP: 1085 case RDS_CMSG_MASKED_ATOMIC_FADD: 1086 ret = rds_cmsg_atomic(rs, rm, cmsg); 1087 break; 1088 1089 case RDS_CMSG_ZCOPY_COOKIE: 1090 ret = rds_cmsg_zcopy(rs, rm, cmsg); 1091 break; 1092 1093 default: 1094 return -EINVAL; 1095 } 1096 1097 if (ret) 1098 break; 1099 } 1100 1101 return ret; 1102 } 1103 1104 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) 1105 { 1106 struct rds_rdma_args *args; 1107 struct cmsghdr *cmsg; 1108 1109 for_each_cmsghdr(cmsg, msg) { 1110 if (!CMSG_OK(msg, cmsg)) 1111 return -EINVAL; 1112 1113 if (cmsg->cmsg_level != SOL_RDS) 1114 continue; 1115 1116 if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) { 1117 if (cmsg->cmsg_len < 1118 CMSG_LEN(sizeof(struct rds_rdma_args))) 1119 return -EINVAL; 1120 args = CMSG_DATA(cmsg); 1121 *rdma_bytes += args->remote_vec.bytes; 1122 } 1123 } 1124 return 0; 1125 } 1126 1127 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) 1128 { 1129 struct sock *sk = sock->sk; 1130 struct rds_sock *rs = rds_sk_to_rs(sk); 1131 DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name); 1132 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name); 1133 __be16 dport; 1134 struct rds_message *rm = NULL; 1135 struct rds_connection *conn; 1136 int ret = 0; 1137 int queued = 0, allocated_mr = 0; 1138 int nonblock = msg->msg_flags & MSG_DONTWAIT; 1139 long timeo = sock_sndtimeo(sk, nonblock); 1140 struct rds_conn_path *cpath; 1141 struct in6_addr daddr; 1142 __u32 scope_id = 0; 1143 size_t rdma_payload_len = 0; 1144 bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) && 1145 sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY)); 1146 int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE); 1147 int namelen; 1148 struct rds_iov_vector_arr vct; 1149 int ind; 1150 1151 memset(&vct, 0, sizeof(vct)); 1152 1153 /* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */ 1154 vct.incr = 1; 1155 1156 /* Mirror Linux UDP mirror of BSD error message compatibility */ 1157 /* XXX: Perhaps MSG_MORE someday */ 1158 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) { 1159 ret = -EOPNOTSUPP; 1160 goto out; 1161 } 1162 1163 namelen = msg->msg_namelen; 1164 if (namelen != 0) { 1165 if (namelen < sizeof(*usin)) { 1166 ret = -EINVAL; 1167 goto out; 1168 } 1169 switch (usin->sin_family) { 1170 case AF_INET: 1171 if (usin->sin_addr.s_addr == htonl(INADDR_ANY) || 1172 usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) || 1173 ipv4_is_multicast(usin->sin_addr.s_addr)) { 1174 ret = -EINVAL; 1175 goto out; 1176 } 1177 ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr); 1178 dport = usin->sin_port; 1179 break; 1180 1181 #if IS_ENABLED(CONFIG_IPV6) 1182 case AF_INET6: { 1183 int addr_type; 1184 1185 if (namelen < sizeof(*sin6)) { 1186 ret = -EINVAL; 1187 goto out; 1188 } 1189 addr_type = ipv6_addr_type(&sin6->sin6_addr); 1190 if (!(addr_type & IPV6_ADDR_UNICAST)) { 1191 __be32 addr4; 1192 1193 if (!(addr_type & IPV6_ADDR_MAPPED)) { 1194 ret = -EINVAL; 1195 goto out; 1196 } 1197 1198 /* It is a mapped address. Need to do some 1199 * sanity checks. 1200 */ 1201 addr4 = sin6->sin6_addr.s6_addr32[3]; 1202 if (addr4 == htonl(INADDR_ANY) || 1203 addr4 == htonl(INADDR_BROADCAST) || 1204 ipv4_is_multicast(addr4)) { 1205 ret = -EINVAL; 1206 goto out; 1207 } 1208 } 1209 if (addr_type & IPV6_ADDR_LINKLOCAL) { 1210 if (sin6->sin6_scope_id == 0) { 1211 ret = -EINVAL; 1212 goto out; 1213 } 1214 scope_id = sin6->sin6_scope_id; 1215 } 1216 1217 daddr = sin6->sin6_addr; 1218 dport = sin6->sin6_port; 1219 break; 1220 } 1221 #endif 1222 1223 default: 1224 ret = -EINVAL; 1225 goto out; 1226 } 1227 } else { 1228 /* We only care about consistency with ->connect() */ 1229 lock_sock(sk); 1230 daddr = rs->rs_conn_addr; 1231 dport = rs->rs_conn_port; 1232 scope_id = rs->rs_bound_scope_id; 1233 release_sock(sk); 1234 } 1235 1236 lock_sock(sk); 1237 if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) { 1238 release_sock(sk); 1239 ret = -ENOTCONN; 1240 goto out; 1241 } else if (namelen != 0) { 1242 /* Cannot send to an IPv4 address using an IPv6 source 1243 * address and cannot send to an IPv6 address using an 1244 * IPv4 source address. 1245 */ 1246 if (ipv6_addr_v4mapped(&daddr) ^ 1247 ipv6_addr_v4mapped(&rs->rs_bound_addr)) { 1248 release_sock(sk); 1249 ret = -EOPNOTSUPP; 1250 goto out; 1251 } 1252 /* If the socket is already bound to a link local address, 1253 * it can only send to peers on the same link. But allow 1254 * communicating between link local and non-link local address. 1255 */ 1256 if (scope_id != rs->rs_bound_scope_id) { 1257 if (!scope_id) { 1258 scope_id = rs->rs_bound_scope_id; 1259 } else if (rs->rs_bound_scope_id) { 1260 release_sock(sk); 1261 ret = -EINVAL; 1262 goto out; 1263 } 1264 } 1265 } 1266 release_sock(sk); 1267 1268 ret = rds_rdma_bytes(msg, &rdma_payload_len); 1269 if (ret) 1270 goto out; 1271 1272 if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) { 1273 ret = -EMSGSIZE; 1274 goto out; 1275 } 1276 1277 if (payload_len > rds_sk_sndbuf(rs)) { 1278 ret = -EMSGSIZE; 1279 goto out; 1280 } 1281 1282 if (zcopy) { 1283 if (rs->rs_transport->t_type != RDS_TRANS_TCP) { 1284 ret = -EOPNOTSUPP; 1285 goto out; 1286 } 1287 num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX); 1288 } 1289 /* size of rm including all sgs */ 1290 ret = rds_rm_size(msg, num_sgs, &vct); 1291 if (ret < 0) 1292 goto out; 1293 1294 rm = rds_message_alloc(ret, GFP_KERNEL); 1295 if (!rm) { 1296 ret = -ENOMEM; 1297 goto out; 1298 } 1299 1300 /* Attach data to the rm */ 1301 if (payload_len) { 1302 rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs); 1303 if (IS_ERR(rm->data.op_sg)) { 1304 ret = PTR_ERR(rm->data.op_sg); 1305 goto out; 1306 } 1307 ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy); 1308 if (ret) 1309 goto out; 1310 } 1311 rm->data.op_active = 1; 1312 1313 rm->m_daddr = daddr; 1314 1315 /* rds_conn_create has a spinlock that runs with IRQ off. 1316 * Caching the conn in the socket helps a lot. */ 1317 if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) && 1318 rs->rs_tos == rs->rs_conn->c_tos) { 1319 conn = rs->rs_conn; 1320 } else { 1321 conn = rds_conn_create_outgoing(sock_net(sock->sk), 1322 &rs->rs_bound_addr, &daddr, 1323 rs->rs_transport, rs->rs_tos, 1324 sock->sk->sk_allocation, 1325 scope_id); 1326 if (IS_ERR(conn)) { 1327 ret = PTR_ERR(conn); 1328 goto out; 1329 } 1330 rs->rs_conn = conn; 1331 } 1332 1333 if (conn->c_trans->t_mp_capable) { 1334 /* Use c_path[0] until we learn that 1335 * the peer supports more (c_npaths > 1) 1336 */ 1337 cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)]; 1338 } else { 1339 cpath = &conn->c_path[0]; 1340 } 1341 1342 /* If we're multipath capable and path 0 is down, queue reconnect 1343 * and send a ping. This initiates the multipath handshake through 1344 * rds_send_probe(), which sends RDS_EXTHDR_NPATHS to the peer, 1345 * starting multipath capability negotiation. 1346 */ 1347 if (conn->c_trans->t_mp_capable && 1348 !rds_conn_path_up(&conn->c_path[0])) { 1349 /* Ensures that only one request is queued. And 1350 * rds_send_ping() ensures that only one ping is 1351 * outstanding. 1352 */ 1353 if (!test_and_set_bit(RDS_RECONNECT_PENDING, 1354 &conn->c_path[0].cp_flags)) 1355 queue_delayed_work(conn->c_path[0].cp_wq, 1356 &conn->c_path[0].cp_conn_w, 0); 1357 rds_send_ping(conn, 0); 1358 } 1359 1360 rm->m_conn_path = cpath; 1361 1362 /* Parse any control messages the user may have included. */ 1363 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct); 1364 if (ret) 1365 goto out; 1366 1367 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { 1368 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1369 &rm->rdma, conn->c_trans->xmit_rdma); 1370 ret = -EOPNOTSUPP; 1371 goto out; 1372 } 1373 1374 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1375 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1376 &rm->atomic, conn->c_trans->xmit_atomic); 1377 ret = -EOPNOTSUPP; 1378 goto out; 1379 } 1380 1381 if (rds_destroy_pending(conn)) { 1382 ret = -EAGAIN; 1383 goto out; 1384 } 1385 1386 if (rds_conn_path_down(cpath)) 1387 rds_check_all_paths(conn); 1388 1389 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1390 if (ret) { 1391 rs->rs_seen_congestion = 1; 1392 goto out; 1393 } 1394 while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, 1395 dport, &queued)) { 1396 rds_stats_inc(s_send_queue_full); 1397 1398 if (nonblock) { 1399 ret = -EAGAIN; 1400 goto out; 1401 } 1402 1403 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1404 rds_send_queue_rm(rs, conn, cpath, rm, 1405 rs->rs_bound_port, 1406 dport, 1407 &queued), 1408 timeo); 1409 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1410 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1411 continue; 1412 1413 ret = timeo; 1414 if (ret == 0) 1415 ret = -ETIMEDOUT; 1416 goto out; 1417 } 1418 1419 /* 1420 * By now we've committed to the send. We reuse rds_send_worker() 1421 * to retry sends in the rds thread if the transport asks us to. 1422 */ 1423 rds_stats_inc(s_send_queued); 1424 1425 ret = rds_send_xmit(cpath); 1426 if (ret == -ENOMEM || ret == -EAGAIN) { 1427 ret = 0; 1428 rcu_read_lock(); 1429 if (rds_destroy_pending(cpath->cp_conn)) 1430 ret = -ENETUNREACH; 1431 else 1432 queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1); 1433 rcu_read_unlock(); 1434 } 1435 if (ret) 1436 goto out; 1437 rds_message_put(rm); 1438 1439 for (ind = 0; ind < vct.indx; ind++) 1440 kfree(vct.vec[ind].iov); 1441 kfree(vct.vec); 1442 1443 return payload_len; 1444 1445 out: 1446 for (ind = 0; ind < vct.indx; ind++) 1447 kfree(vct.vec[ind].iov); 1448 kfree(vct.vec); 1449 1450 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1451 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1452 * or in any other way, we need to destroy the MR again */ 1453 if (allocated_mr) 1454 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1455 1456 if (rm) 1457 rds_message_put(rm); 1458 return ret; 1459 } 1460 1461 /* 1462 * send out a probe. Can be shared by rds_send_ping, 1463 * rds_send_pong, rds_send_hb. 1464 * rds_send_hb should use h_flags 1465 * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED 1466 * or 1467 * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED 1468 */ 1469 static int 1470 rds_send_probe(struct rds_conn_path *cp, __be16 sport, 1471 __be16 dport, u8 h_flags) 1472 { 1473 struct rds_message *rm; 1474 unsigned long flags; 1475 int ret = 0; 1476 1477 rm = rds_message_alloc(0, GFP_ATOMIC); 1478 if (!rm) { 1479 ret = -ENOMEM; 1480 goto out; 1481 } 1482 1483 rm->m_daddr = cp->cp_conn->c_faddr; 1484 rm->data.op_active = 1; 1485 1486 rds_conn_path_connect_if_down(cp); 1487 1488 ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); 1489 if (ret) 1490 goto out; 1491 1492 spin_lock_irqsave(&cp->cp_lock, flags); 1493 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 1494 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1495 rds_message_addref(rm); 1496 rm->m_inc.i_conn = cp->cp_conn; 1497 rm->m_inc.i_conn_path = cp; 1498 1499 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 1500 cp->cp_next_tx_seq); 1501 rm->m_inc.i_hdr.h_flags |= h_flags; 1502 cp->cp_next_tx_seq++; 1503 1504 if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) && 1505 cp->cp_conn->c_trans->t_mp_capable) { 1506 __be16 npaths = cpu_to_be16(RDS_MPATH_WORKERS); 1507 __be32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num); 1508 u8 dummy = 0; 1509 1510 rds_message_add_extension(&rm->m_inc.i_hdr, 1511 RDS_EXTHDR_NPATHS, &npaths); 1512 rds_message_add_extension(&rm->m_inc.i_hdr, 1513 RDS_EXTHDR_GEN_NUM, 1514 &my_gen_num); 1515 rds_message_add_extension(&rm->m_inc.i_hdr, 1516 RDS_EXTHDR_SPORT_IDX, 1517 &dummy); 1518 } 1519 spin_unlock_irqrestore(&cp->cp_lock, flags); 1520 1521 rds_stats_inc(s_send_queued); 1522 rds_stats_inc(s_send_pong); 1523 1524 /* schedule the send work on cp_wq */ 1525 rcu_read_lock(); 1526 if (!rds_destroy_pending(cp->cp_conn)) 1527 queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1); 1528 rcu_read_unlock(); 1529 1530 rds_message_put(rm); 1531 return 0; 1532 1533 out: 1534 if (rm) 1535 rds_message_put(rm); 1536 return ret; 1537 } 1538 1539 int 1540 rds_send_pong(struct rds_conn_path *cp, __be16 dport) 1541 { 1542 return rds_send_probe(cp, 0, dport, 0); 1543 } 1544 1545 void 1546 rds_send_ping(struct rds_connection *conn, int cp_index) 1547 { 1548 unsigned long flags; 1549 struct rds_conn_path *cp = &conn->c_path[cp_index]; 1550 1551 spin_lock_irqsave(&cp->cp_lock, flags); 1552 if (conn->c_ping_triggered) { 1553 spin_unlock_irqrestore(&cp->cp_lock, flags); 1554 return; 1555 } 1556 conn->c_ping_triggered = 1; 1557 spin_unlock_irqrestore(&cp->cp_lock, flags); 1558 rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0); 1559 } 1560 EXPORT_SYMBOL_GPL(rds_send_ping); 1561