1 /* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/moduleparam.h> 35 #include <linux/gfp.h> 36 #include <net/sock.h> 37 #include <linux/in.h> 38 #include <linux/list.h> 39 #include <linux/ratelimit.h> 40 #include <linux/export.h> 41 #include <linux/sizes.h> 42 43 #include "rds.h" 44 45 /* When transmitting messages in rds_send_xmit, we need to emerge from 46 * time to time and briefly release the CPU. Otherwise the softlock watchdog 47 * will kick our shin. 48 * Also, it seems fairer to not let one busy connection stall all the 49 * others. 50 * 51 * send_batch_count is the number of times we'll loop in send_xmit. Setting 52 * it to 0 will restore the old behavior (where we looped until we had 53 * drained the queue). 54 */ 55 static int send_batch_count = SZ_1K; 56 module_param(send_batch_count, int, 0444); 57 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 58 59 static void rds_send_remove_from_sock(struct list_head *messages, int status); 60 61 /* 62 * Reset the send state. Callers must ensure that this doesn't race with 63 * rds_send_xmit(). 64 */ 65 void rds_send_reset(struct rds_connection *conn) 66 { 67 struct rds_message *rm, *tmp; 68 unsigned long flags; 69 70 if (conn->c_xmit_rm) { 71 rm = conn->c_xmit_rm; 72 conn->c_xmit_rm = NULL; 73 /* Tell the user the RDMA op is no longer mapped by the 74 * transport. This isn't entirely true (it's flushed out 75 * independently) but as the connection is down, there's 76 * no ongoing RDMA to/from that memory */ 77 rds_message_unmapped(rm); 78 rds_message_put(rm); 79 } 80 81 conn->c_xmit_sg = 0; 82 conn->c_xmit_hdr_off = 0; 83 conn->c_xmit_data_off = 0; 84 conn->c_xmit_atomic_sent = 0; 85 conn->c_xmit_rdma_sent = 0; 86 conn->c_xmit_data_sent = 0; 87 88 conn->c_map_queued = 0; 89 90 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 91 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 92 93 /* Mark messages as retransmissions, and move them to the send q */ 94 spin_lock_irqsave(&conn->c_lock, flags); 95 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 96 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 97 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 98 } 99 list_splice_init(&conn->c_retrans, &conn->c_send_queue); 100 spin_unlock_irqrestore(&conn->c_lock, flags); 101 } 102 EXPORT_SYMBOL_GPL(rds_send_reset); 103 104 static int acquire_in_xmit(struct rds_connection *conn) 105 { 106 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; 107 } 108 109 static void release_in_xmit(struct rds_connection *conn) 110 { 111 clear_bit(RDS_IN_XMIT, &conn->c_flags); 112 smp_mb__after_atomic(); 113 /* 114 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a 115 * hot path and finding waiters is very rare. We don't want to walk 116 * the system-wide hashed waitqueue buckets in the fast path only to 117 * almost never find waiters. 118 */ 119 if (waitqueue_active(&conn->c_waitq)) 120 wake_up_all(&conn->c_waitq); 121 } 122 123 /* 124 * We're making the conscious trade-off here to only send one message 125 * down the connection at a time. 126 * Pro: 127 * - tx queueing is a simple fifo list 128 * - reassembly is optional and easily done by transports per conn 129 * - no per flow rx lookup at all, straight to the socket 130 * - less per-frag memory and wire overhead 131 * Con: 132 * - queued acks can be delayed behind large messages 133 * Depends: 134 * - small message latency is higher behind queued large messages 135 * - large message latency isn't starved by intervening small sends 136 */ 137 int rds_send_xmit(struct rds_connection *conn) 138 { 139 struct rds_message *rm; 140 unsigned long flags; 141 unsigned int tmp; 142 struct scatterlist *sg; 143 int ret = 0; 144 LIST_HEAD(to_be_dropped); 145 int batch_count; 146 unsigned long send_gen = 0; 147 148 restart: 149 batch_count = 0; 150 151 /* 152 * sendmsg calls here after having queued its message on the send 153 * queue. We only have one task feeding the connection at a time. If 154 * another thread is already feeding the queue then we back off. This 155 * avoids blocking the caller and trading per-connection data between 156 * caches per message. 157 */ 158 if (!acquire_in_xmit(conn)) { 159 rds_stats_inc(s_send_lock_contention); 160 ret = -ENOMEM; 161 goto out; 162 } 163 164 /* 165 * we record the send generation after doing the xmit acquire. 166 * if someone else manages to jump in and do some work, we'll use 167 * this to avoid a goto restart farther down. 168 * 169 * The acquire_in_xmit() check above ensures that only one 170 * caller can increment c_send_gen at any time. 171 */ 172 conn->c_send_gen++; 173 send_gen = conn->c_send_gen; 174 175 /* 176 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 177 * we do the opposite to avoid races. 178 */ 179 if (!rds_conn_up(conn)) { 180 release_in_xmit(conn); 181 ret = 0; 182 goto out; 183 } 184 185 if (conn->c_trans->xmit_prepare) 186 conn->c_trans->xmit_prepare(conn); 187 188 /* 189 * spin trying to push headers and data down the connection until 190 * the connection doesn't make forward progress. 191 */ 192 while (1) { 193 194 rm = conn->c_xmit_rm; 195 196 /* 197 * If between sending messages, we can send a pending congestion 198 * map update. 199 */ 200 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 201 rm = rds_cong_update_alloc(conn); 202 if (IS_ERR(rm)) { 203 ret = PTR_ERR(rm); 204 break; 205 } 206 rm->data.op_active = 1; 207 208 conn->c_xmit_rm = rm; 209 } 210 211 /* 212 * If not already working on one, grab the next message. 213 * 214 * c_xmit_rm holds a ref while we're sending this message down 215 * the connction. We can use this ref while holding the 216 * send_sem.. rds_send_reset() is serialized with it. 217 */ 218 if (!rm) { 219 unsigned int len; 220 221 batch_count++; 222 223 /* we want to process as big a batch as we can, but 224 * we also want to avoid softlockups. If we've been 225 * through a lot of messages, lets back off and see 226 * if anyone else jumps in 227 */ 228 if (batch_count >= send_batch_count) 229 goto over_batch; 230 231 spin_lock_irqsave(&conn->c_lock, flags); 232 233 if (!list_empty(&conn->c_send_queue)) { 234 rm = list_entry(conn->c_send_queue.next, 235 struct rds_message, 236 m_conn_item); 237 rds_message_addref(rm); 238 239 /* 240 * Move the message from the send queue to the retransmit 241 * list right away. 242 */ 243 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 244 } 245 246 spin_unlock_irqrestore(&conn->c_lock, flags); 247 248 if (!rm) 249 break; 250 251 /* Unfortunately, the way Infiniband deals with 252 * RDMA to a bad MR key is by moving the entire 253 * queue pair to error state. We cold possibly 254 * recover from that, but right now we drop the 255 * connection. 256 * Therefore, we never retransmit messages with RDMA ops. 257 */ 258 if (rm->rdma.op_active && 259 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 260 spin_lock_irqsave(&conn->c_lock, flags); 261 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 262 list_move(&rm->m_conn_item, &to_be_dropped); 263 spin_unlock_irqrestore(&conn->c_lock, flags); 264 continue; 265 } 266 267 /* Require an ACK every once in a while */ 268 len = ntohl(rm->m_inc.i_hdr.h_len); 269 if (conn->c_unacked_packets == 0 || 270 conn->c_unacked_bytes < len) { 271 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 272 273 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 274 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 275 rds_stats_inc(s_send_ack_required); 276 } else { 277 conn->c_unacked_bytes -= len; 278 conn->c_unacked_packets--; 279 } 280 281 conn->c_xmit_rm = rm; 282 } 283 284 /* The transport either sends the whole rdma or none of it */ 285 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 286 rm->m_final_op = &rm->rdma; 287 /* The transport owns the mapped memory for now. 288 * You can't unmap it while it's on the send queue 289 */ 290 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 291 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 292 if (ret) { 293 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 294 wake_up_interruptible(&rm->m_flush_wait); 295 break; 296 } 297 conn->c_xmit_rdma_sent = 1; 298 299 } 300 301 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 302 rm->m_final_op = &rm->atomic; 303 /* The transport owns the mapped memory for now. 304 * You can't unmap it while it's on the send queue 305 */ 306 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 307 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 308 if (ret) { 309 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 310 wake_up_interruptible(&rm->m_flush_wait); 311 break; 312 } 313 conn->c_xmit_atomic_sent = 1; 314 315 } 316 317 /* 318 * A number of cases require an RDS header to be sent 319 * even if there is no data. 320 * We permit 0-byte sends; rds-ping depends on this. 321 * However, if there are exclusively attached silent ops, 322 * we skip the hdr/data send, to enable silent operation. 323 */ 324 if (rm->data.op_nents == 0) { 325 int ops_present; 326 int all_ops_are_silent = 1; 327 328 ops_present = (rm->atomic.op_active || rm->rdma.op_active); 329 if (rm->atomic.op_active && !rm->atomic.op_silent) 330 all_ops_are_silent = 0; 331 if (rm->rdma.op_active && !rm->rdma.op_silent) 332 all_ops_are_silent = 0; 333 334 if (ops_present && all_ops_are_silent 335 && !rm->m_rdma_cookie) 336 rm->data.op_active = 0; 337 } 338 339 if (rm->data.op_active && !conn->c_xmit_data_sent) { 340 rm->m_final_op = &rm->data; 341 ret = conn->c_trans->xmit(conn, rm, 342 conn->c_xmit_hdr_off, 343 conn->c_xmit_sg, 344 conn->c_xmit_data_off); 345 if (ret <= 0) 346 break; 347 348 if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { 349 tmp = min_t(int, ret, 350 sizeof(struct rds_header) - 351 conn->c_xmit_hdr_off); 352 conn->c_xmit_hdr_off += tmp; 353 ret -= tmp; 354 } 355 356 sg = &rm->data.op_sg[conn->c_xmit_sg]; 357 while (ret) { 358 tmp = min_t(int, ret, sg->length - 359 conn->c_xmit_data_off); 360 conn->c_xmit_data_off += tmp; 361 ret -= tmp; 362 if (conn->c_xmit_data_off == sg->length) { 363 conn->c_xmit_data_off = 0; 364 sg++; 365 conn->c_xmit_sg++; 366 BUG_ON(ret != 0 && 367 conn->c_xmit_sg == rm->data.op_nents); 368 } 369 } 370 371 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && 372 (conn->c_xmit_sg == rm->data.op_nents)) 373 conn->c_xmit_data_sent = 1; 374 } 375 376 /* 377 * A rm will only take multiple times through this loop 378 * if there is a data op. Thus, if the data is sent (or there was 379 * none), then we're done with the rm. 380 */ 381 if (!rm->data.op_active || conn->c_xmit_data_sent) { 382 conn->c_xmit_rm = NULL; 383 conn->c_xmit_sg = 0; 384 conn->c_xmit_hdr_off = 0; 385 conn->c_xmit_data_off = 0; 386 conn->c_xmit_rdma_sent = 0; 387 conn->c_xmit_atomic_sent = 0; 388 conn->c_xmit_data_sent = 0; 389 390 rds_message_put(rm); 391 } 392 } 393 394 over_batch: 395 if (conn->c_trans->xmit_complete) 396 conn->c_trans->xmit_complete(conn); 397 release_in_xmit(conn); 398 399 /* Nuke any messages we decided not to retransmit. */ 400 if (!list_empty(&to_be_dropped)) { 401 /* irqs on here, so we can put(), unlike above */ 402 list_for_each_entry(rm, &to_be_dropped, m_conn_item) 403 rds_message_put(rm); 404 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 405 } 406 407 /* 408 * Other senders can queue a message after we last test the send queue 409 * but before we clear RDS_IN_XMIT. In that case they'd back off and 410 * not try and send their newly queued message. We need to check the 411 * send queue after having cleared RDS_IN_XMIT so that their message 412 * doesn't get stuck on the send queue. 413 * 414 * If the transport cannot continue (i.e ret != 0), then it must 415 * call us when more room is available, such as from the tx 416 * completion handler. 417 * 418 * We have an extra generation check here so that if someone manages 419 * to jump in after our release_in_xmit, we'll see that they have done 420 * some work and we will skip our goto 421 */ 422 if (ret == 0) { 423 smp_mb(); 424 if ((test_bit(0, &conn->c_map_queued) || 425 !list_empty(&conn->c_send_queue)) && 426 send_gen == conn->c_send_gen) { 427 rds_stats_inc(s_send_lock_queue_raced); 428 if (batch_count < send_batch_count) 429 goto restart; 430 queue_delayed_work(rds_wq, &conn->c_send_w, 1); 431 } 432 } 433 out: 434 return ret; 435 } 436 EXPORT_SYMBOL_GPL(rds_send_xmit); 437 438 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 439 { 440 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 441 442 assert_spin_locked(&rs->rs_lock); 443 444 BUG_ON(rs->rs_snd_bytes < len); 445 rs->rs_snd_bytes -= len; 446 447 if (rs->rs_snd_bytes == 0) 448 rds_stats_inc(s_send_queue_empty); 449 } 450 451 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 452 is_acked_func is_acked) 453 { 454 if (is_acked) 455 return is_acked(rm, ack); 456 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 457 } 458 459 /* 460 * This is pretty similar to what happens below in the ACK 461 * handling code - except that we call here as soon as we get 462 * the IB send completion on the RDMA op and the accompanying 463 * message. 464 */ 465 void rds_rdma_send_complete(struct rds_message *rm, int status) 466 { 467 struct rds_sock *rs = NULL; 468 struct rm_rdma_op *ro; 469 struct rds_notifier *notifier; 470 unsigned long flags; 471 472 spin_lock_irqsave(&rm->m_rs_lock, flags); 473 474 ro = &rm->rdma; 475 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 476 ro->op_active && ro->op_notify && ro->op_notifier) { 477 notifier = ro->op_notifier; 478 rs = rm->m_rs; 479 sock_hold(rds_rs_to_sk(rs)); 480 481 notifier->n_status = status; 482 spin_lock(&rs->rs_lock); 483 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 484 spin_unlock(&rs->rs_lock); 485 486 ro->op_notifier = NULL; 487 } 488 489 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 490 491 if (rs) { 492 rds_wake_sk_sleep(rs); 493 sock_put(rds_rs_to_sk(rs)); 494 } 495 } 496 EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 497 498 /* 499 * Just like above, except looks at atomic op 500 */ 501 void rds_atomic_send_complete(struct rds_message *rm, int status) 502 { 503 struct rds_sock *rs = NULL; 504 struct rm_atomic_op *ao; 505 struct rds_notifier *notifier; 506 unsigned long flags; 507 508 spin_lock_irqsave(&rm->m_rs_lock, flags); 509 510 ao = &rm->atomic; 511 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 512 && ao->op_active && ao->op_notify && ao->op_notifier) { 513 notifier = ao->op_notifier; 514 rs = rm->m_rs; 515 sock_hold(rds_rs_to_sk(rs)); 516 517 notifier->n_status = status; 518 spin_lock(&rs->rs_lock); 519 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 520 spin_unlock(&rs->rs_lock); 521 522 ao->op_notifier = NULL; 523 } 524 525 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 526 527 if (rs) { 528 rds_wake_sk_sleep(rs); 529 sock_put(rds_rs_to_sk(rs)); 530 } 531 } 532 EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 533 534 /* 535 * This is the same as rds_rdma_send_complete except we 536 * don't do any locking - we have all the ingredients (message, 537 * socket, socket lock) and can just move the notifier. 538 */ 539 static inline void 540 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 541 { 542 struct rm_rdma_op *ro; 543 struct rm_atomic_op *ao; 544 545 ro = &rm->rdma; 546 if (ro->op_active && ro->op_notify && ro->op_notifier) { 547 ro->op_notifier->n_status = status; 548 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 549 ro->op_notifier = NULL; 550 } 551 552 ao = &rm->atomic; 553 if (ao->op_active && ao->op_notify && ao->op_notifier) { 554 ao->op_notifier->n_status = status; 555 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); 556 ao->op_notifier = NULL; 557 } 558 559 /* No need to wake the app - caller does this */ 560 } 561 562 /* 563 * This is called from the IB send completion when we detect 564 * a RDMA operation that failed with remote access error. 565 * So speed is not an issue here. 566 */ 567 struct rds_message *rds_send_get_message(struct rds_connection *conn, 568 struct rm_rdma_op *op) 569 { 570 struct rds_message *rm, *tmp, *found = NULL; 571 unsigned long flags; 572 573 spin_lock_irqsave(&conn->c_lock, flags); 574 575 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 576 if (&rm->rdma == op) { 577 atomic_inc(&rm->m_refcount); 578 found = rm; 579 goto out; 580 } 581 } 582 583 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 584 if (&rm->rdma == op) { 585 atomic_inc(&rm->m_refcount); 586 found = rm; 587 break; 588 } 589 } 590 591 out: 592 spin_unlock_irqrestore(&conn->c_lock, flags); 593 594 return found; 595 } 596 EXPORT_SYMBOL_GPL(rds_send_get_message); 597 598 /* 599 * This removes messages from the socket's list if they're on it. The list 600 * argument must be private to the caller, we must be able to modify it 601 * without locks. The messages must have a reference held for their 602 * position on the list. This function will drop that reference after 603 * removing the messages from the 'messages' list regardless of if it found 604 * the messages on the socket list or not. 605 */ 606 static void rds_send_remove_from_sock(struct list_head *messages, int status) 607 { 608 unsigned long flags; 609 struct rds_sock *rs = NULL; 610 struct rds_message *rm; 611 612 while (!list_empty(messages)) { 613 int was_on_sock = 0; 614 615 rm = list_entry(messages->next, struct rds_message, 616 m_conn_item); 617 list_del_init(&rm->m_conn_item); 618 619 /* 620 * If we see this flag cleared then we're *sure* that someone 621 * else beat us to removing it from the sock. If we race 622 * with their flag update we'll get the lock and then really 623 * see that the flag has been cleared. 624 * 625 * The message spinlock makes sure nobody clears rm->m_rs 626 * while we're messing with it. It does not prevent the 627 * message from being removed from the socket, though. 628 */ 629 spin_lock_irqsave(&rm->m_rs_lock, flags); 630 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 631 goto unlock_and_drop; 632 633 if (rs != rm->m_rs) { 634 if (rs) { 635 rds_wake_sk_sleep(rs); 636 sock_put(rds_rs_to_sk(rs)); 637 } 638 rs = rm->m_rs; 639 if (rs) 640 sock_hold(rds_rs_to_sk(rs)); 641 } 642 if (!rs) 643 goto unlock_and_drop; 644 spin_lock(&rs->rs_lock); 645 646 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 647 struct rm_rdma_op *ro = &rm->rdma; 648 struct rds_notifier *notifier; 649 650 list_del_init(&rm->m_sock_item); 651 rds_send_sndbuf_remove(rs, rm); 652 653 if (ro->op_active && ro->op_notifier && 654 (ro->op_notify || (ro->op_recverr && status))) { 655 notifier = ro->op_notifier; 656 list_add_tail(¬ifier->n_list, 657 &rs->rs_notify_queue); 658 if (!notifier->n_status) 659 notifier->n_status = status; 660 rm->rdma.op_notifier = NULL; 661 } 662 was_on_sock = 1; 663 rm->m_rs = NULL; 664 } 665 spin_unlock(&rs->rs_lock); 666 667 unlock_and_drop: 668 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 669 rds_message_put(rm); 670 if (was_on_sock) 671 rds_message_put(rm); 672 } 673 674 if (rs) { 675 rds_wake_sk_sleep(rs); 676 sock_put(rds_rs_to_sk(rs)); 677 } 678 } 679 680 /* 681 * Transports call here when they've determined that the receiver queued 682 * messages up to, and including, the given sequence number. Messages are 683 * moved to the retrans queue when rds_send_xmit picks them off the send 684 * queue. This means that in the TCP case, the message may not have been 685 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 686 * checks the RDS_MSG_HAS_ACK_SEQ bit. 687 */ 688 void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 689 is_acked_func is_acked) 690 { 691 struct rds_message *rm, *tmp; 692 unsigned long flags; 693 LIST_HEAD(list); 694 695 spin_lock_irqsave(&conn->c_lock, flags); 696 697 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 698 if (!rds_send_is_acked(rm, ack, is_acked)) 699 break; 700 701 list_move(&rm->m_conn_item, &list); 702 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 703 } 704 705 /* order flag updates with spin locks */ 706 if (!list_empty(&list)) 707 smp_mb__after_atomic(); 708 709 spin_unlock_irqrestore(&conn->c_lock, flags); 710 711 /* now remove the messages from the sock list as needed */ 712 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 713 } 714 EXPORT_SYMBOL_GPL(rds_send_drop_acked); 715 716 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) 717 { 718 struct rds_message *rm, *tmp; 719 struct rds_connection *conn; 720 unsigned long flags; 721 LIST_HEAD(list); 722 723 /* get all the messages we're dropping under the rs lock */ 724 spin_lock_irqsave(&rs->rs_lock, flags); 725 726 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 727 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 728 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 729 continue; 730 731 list_move(&rm->m_sock_item, &list); 732 rds_send_sndbuf_remove(rs, rm); 733 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 734 } 735 736 /* order flag updates with the rs lock */ 737 smp_mb__after_atomic(); 738 739 spin_unlock_irqrestore(&rs->rs_lock, flags); 740 741 if (list_empty(&list)) 742 return; 743 744 /* Remove the messages from the conn */ 745 list_for_each_entry(rm, &list, m_sock_item) { 746 747 conn = rm->m_inc.i_conn; 748 749 spin_lock_irqsave(&conn->c_lock, flags); 750 /* 751 * Maybe someone else beat us to removing rm from the conn. 752 * If we race with their flag update we'll get the lock and 753 * then really see that the flag has been cleared. 754 */ 755 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 756 spin_unlock_irqrestore(&conn->c_lock, flags); 757 spin_lock_irqsave(&rm->m_rs_lock, flags); 758 rm->m_rs = NULL; 759 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 760 continue; 761 } 762 list_del_init(&rm->m_conn_item); 763 spin_unlock_irqrestore(&conn->c_lock, flags); 764 765 /* 766 * Couldn't grab m_rs_lock in top loop (lock ordering), 767 * but we can now. 768 */ 769 spin_lock_irqsave(&rm->m_rs_lock, flags); 770 771 spin_lock(&rs->rs_lock); 772 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 773 spin_unlock(&rs->rs_lock); 774 775 rm->m_rs = NULL; 776 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 777 778 rds_message_put(rm); 779 } 780 781 rds_wake_sk_sleep(rs); 782 783 while (!list_empty(&list)) { 784 rm = list_entry(list.next, struct rds_message, m_sock_item); 785 list_del_init(&rm->m_sock_item); 786 rds_message_wait(rm); 787 788 /* just in case the code above skipped this message 789 * because RDS_MSG_ON_CONN wasn't set, run it again here 790 * taking m_rs_lock is the only thing that keeps us 791 * from racing with ack processing. 792 */ 793 spin_lock_irqsave(&rm->m_rs_lock, flags); 794 795 spin_lock(&rs->rs_lock); 796 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 797 spin_unlock(&rs->rs_lock); 798 799 rm->m_rs = NULL; 800 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 801 802 rds_message_put(rm); 803 } 804 } 805 806 /* 807 * we only want this to fire once so we use the callers 'queued'. It's 808 * possible that another thread can race with us and remove the 809 * message from the flow with RDS_CANCEL_SENT_TO. 810 */ 811 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 812 struct rds_message *rm, __be16 sport, 813 __be16 dport, int *queued) 814 { 815 unsigned long flags; 816 u32 len; 817 818 if (*queued) 819 goto out; 820 821 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 822 823 /* this is the only place which holds both the socket's rs_lock 824 * and the connection's c_lock */ 825 spin_lock_irqsave(&rs->rs_lock, flags); 826 827 /* 828 * If there is a little space in sndbuf, we don't queue anything, 829 * and userspace gets -EAGAIN. But poll() indicates there's send 830 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 831 * freed up by incoming acks. So we check the *old* value of 832 * rs_snd_bytes here to allow the last msg to exceed the buffer, 833 * and poll() now knows no more data can be sent. 834 */ 835 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 836 rs->rs_snd_bytes += len; 837 838 /* let recv side know we are close to send space exhaustion. 839 * This is probably not the optimal way to do it, as this 840 * means we set the flag on *all* messages as soon as our 841 * throughput hits a certain threshold. 842 */ 843 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 844 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 845 846 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 847 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 848 rds_message_addref(rm); 849 rm->m_rs = rs; 850 851 /* The code ordering is a little weird, but we're 852 trying to minimize the time we hold c_lock */ 853 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 854 rm->m_inc.i_conn = conn; 855 rds_message_addref(rm); 856 857 spin_lock(&conn->c_lock); 858 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); 859 list_add_tail(&rm->m_conn_item, &conn->c_send_queue); 860 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 861 spin_unlock(&conn->c_lock); 862 863 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 864 rm, len, rs, rs->rs_snd_bytes, 865 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 866 867 *queued = 1; 868 } 869 870 spin_unlock_irqrestore(&rs->rs_lock, flags); 871 out: 872 return *queued; 873 } 874 875 /* 876 * rds_message is getting to be quite complicated, and we'd like to allocate 877 * it all in one go. This figures out how big it needs to be up front. 878 */ 879 static int rds_rm_size(struct msghdr *msg, int data_len) 880 { 881 struct cmsghdr *cmsg; 882 int size = 0; 883 int cmsg_groups = 0; 884 int retval; 885 886 for_each_cmsghdr(cmsg, msg) { 887 if (!CMSG_OK(msg, cmsg)) 888 return -EINVAL; 889 890 if (cmsg->cmsg_level != SOL_RDS) 891 continue; 892 893 switch (cmsg->cmsg_type) { 894 case RDS_CMSG_RDMA_ARGS: 895 cmsg_groups |= 1; 896 retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); 897 if (retval < 0) 898 return retval; 899 size += retval; 900 901 break; 902 903 case RDS_CMSG_RDMA_DEST: 904 case RDS_CMSG_RDMA_MAP: 905 cmsg_groups |= 2; 906 /* these are valid but do no add any size */ 907 break; 908 909 case RDS_CMSG_ATOMIC_CSWP: 910 case RDS_CMSG_ATOMIC_FADD: 911 case RDS_CMSG_MASKED_ATOMIC_CSWP: 912 case RDS_CMSG_MASKED_ATOMIC_FADD: 913 cmsg_groups |= 1; 914 size += sizeof(struct scatterlist); 915 break; 916 917 default: 918 return -EINVAL; 919 } 920 921 } 922 923 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); 924 925 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 926 if (cmsg_groups == 3) 927 return -EINVAL; 928 929 return size; 930 } 931 932 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 933 struct msghdr *msg, int *allocated_mr) 934 { 935 struct cmsghdr *cmsg; 936 int ret = 0; 937 938 for_each_cmsghdr(cmsg, msg) { 939 if (!CMSG_OK(msg, cmsg)) 940 return -EINVAL; 941 942 if (cmsg->cmsg_level != SOL_RDS) 943 continue; 944 945 /* As a side effect, RDMA_DEST and RDMA_MAP will set 946 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 947 */ 948 switch (cmsg->cmsg_type) { 949 case RDS_CMSG_RDMA_ARGS: 950 ret = rds_cmsg_rdma_args(rs, rm, cmsg); 951 break; 952 953 case RDS_CMSG_RDMA_DEST: 954 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 955 break; 956 957 case RDS_CMSG_RDMA_MAP: 958 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 959 if (!ret) 960 *allocated_mr = 1; 961 break; 962 case RDS_CMSG_ATOMIC_CSWP: 963 case RDS_CMSG_ATOMIC_FADD: 964 case RDS_CMSG_MASKED_ATOMIC_CSWP: 965 case RDS_CMSG_MASKED_ATOMIC_FADD: 966 ret = rds_cmsg_atomic(rs, rm, cmsg); 967 break; 968 969 default: 970 return -EINVAL; 971 } 972 973 if (ret) 974 break; 975 } 976 977 return ret; 978 } 979 980 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) 981 { 982 struct sock *sk = sock->sk; 983 struct rds_sock *rs = rds_sk_to_rs(sk); 984 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name); 985 __be32 daddr; 986 __be16 dport; 987 struct rds_message *rm = NULL; 988 struct rds_connection *conn; 989 int ret = 0; 990 int queued = 0, allocated_mr = 0; 991 int nonblock = msg->msg_flags & MSG_DONTWAIT; 992 long timeo = sock_sndtimeo(sk, nonblock); 993 994 /* Mirror Linux UDP mirror of BSD error message compatibility */ 995 /* XXX: Perhaps MSG_MORE someday */ 996 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { 997 ret = -EOPNOTSUPP; 998 goto out; 999 } 1000 1001 if (msg->msg_namelen) { 1002 /* XXX fail non-unicast destination IPs? */ 1003 if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) { 1004 ret = -EINVAL; 1005 goto out; 1006 } 1007 daddr = usin->sin_addr.s_addr; 1008 dport = usin->sin_port; 1009 } else { 1010 /* We only care about consistency with ->connect() */ 1011 lock_sock(sk); 1012 daddr = rs->rs_conn_addr; 1013 dport = rs->rs_conn_port; 1014 release_sock(sk); 1015 } 1016 1017 lock_sock(sk); 1018 if (daddr == 0 || rs->rs_bound_addr == 0) { 1019 release_sock(sk); 1020 ret = -ENOTCONN; /* XXX not a great errno */ 1021 goto out; 1022 } 1023 release_sock(sk); 1024 1025 if (payload_len > rds_sk_sndbuf(rs)) { 1026 ret = -EMSGSIZE; 1027 goto out; 1028 } 1029 1030 /* size of rm including all sgs */ 1031 ret = rds_rm_size(msg, payload_len); 1032 if (ret < 0) 1033 goto out; 1034 1035 rm = rds_message_alloc(ret, GFP_KERNEL); 1036 if (!rm) { 1037 ret = -ENOMEM; 1038 goto out; 1039 } 1040 1041 /* Attach data to the rm */ 1042 if (payload_len) { 1043 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); 1044 if (!rm->data.op_sg) { 1045 ret = -ENOMEM; 1046 goto out; 1047 } 1048 ret = rds_message_copy_from_user(rm, &msg->msg_iter); 1049 if (ret) 1050 goto out; 1051 } 1052 rm->data.op_active = 1; 1053 1054 rm->m_daddr = daddr; 1055 1056 /* rds_conn_create has a spinlock that runs with IRQ off. 1057 * Caching the conn in the socket helps a lot. */ 1058 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) 1059 conn = rs->rs_conn; 1060 else { 1061 conn = rds_conn_create_outgoing(sock_net(sock->sk), 1062 rs->rs_bound_addr, daddr, 1063 rs->rs_transport, 1064 sock->sk->sk_allocation); 1065 if (IS_ERR(conn)) { 1066 ret = PTR_ERR(conn); 1067 goto out; 1068 } 1069 rs->rs_conn = conn; 1070 } 1071 1072 /* Parse any control messages the user may have included. */ 1073 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); 1074 if (ret) 1075 goto out; 1076 1077 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { 1078 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1079 &rm->rdma, conn->c_trans->xmit_rdma); 1080 ret = -EOPNOTSUPP; 1081 goto out; 1082 } 1083 1084 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1085 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1086 &rm->atomic, conn->c_trans->xmit_atomic); 1087 ret = -EOPNOTSUPP; 1088 goto out; 1089 } 1090 1091 rds_conn_connect_if_down(conn); 1092 1093 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1094 if (ret) { 1095 rs->rs_seen_congestion = 1; 1096 goto out; 1097 } 1098 1099 while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1100 dport, &queued)) { 1101 rds_stats_inc(s_send_queue_full); 1102 1103 if (nonblock) { 1104 ret = -EAGAIN; 1105 goto out; 1106 } 1107 1108 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1109 rds_send_queue_rm(rs, conn, rm, 1110 rs->rs_bound_port, 1111 dport, 1112 &queued), 1113 timeo); 1114 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1115 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1116 continue; 1117 1118 ret = timeo; 1119 if (ret == 0) 1120 ret = -ETIMEDOUT; 1121 goto out; 1122 } 1123 1124 /* 1125 * By now we've committed to the send. We reuse rds_send_worker() 1126 * to retry sends in the rds thread if the transport asks us to. 1127 */ 1128 rds_stats_inc(s_send_queued); 1129 1130 ret = rds_send_xmit(conn); 1131 if (ret == -ENOMEM || ret == -EAGAIN) 1132 queue_delayed_work(rds_wq, &conn->c_send_w, 1); 1133 1134 rds_message_put(rm); 1135 return payload_len; 1136 1137 out: 1138 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1139 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1140 * or in any other way, we need to destroy the MR again */ 1141 if (allocated_mr) 1142 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1143 1144 if (rm) 1145 rds_message_put(rm); 1146 return ret; 1147 } 1148 1149 /* 1150 * Reply to a ping packet. 1151 */ 1152 int 1153 rds_send_pong(struct rds_connection *conn, __be16 dport) 1154 { 1155 struct rds_message *rm; 1156 unsigned long flags; 1157 int ret = 0; 1158 1159 rm = rds_message_alloc(0, GFP_ATOMIC); 1160 if (!rm) { 1161 ret = -ENOMEM; 1162 goto out; 1163 } 1164 1165 rm->m_daddr = conn->c_faddr; 1166 rm->data.op_active = 1; 1167 1168 rds_conn_connect_if_down(conn); 1169 1170 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); 1171 if (ret) 1172 goto out; 1173 1174 spin_lock_irqsave(&conn->c_lock, flags); 1175 list_add_tail(&rm->m_conn_item, &conn->c_send_queue); 1176 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1177 rds_message_addref(rm); 1178 rm->m_inc.i_conn = conn; 1179 1180 rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1181 conn->c_next_tx_seq); 1182 conn->c_next_tx_seq++; 1183 spin_unlock_irqrestore(&conn->c_lock, flags); 1184 1185 rds_stats_inc(s_send_queued); 1186 rds_stats_inc(s_send_pong); 1187 1188 /* schedule the send work on rds_wq */ 1189 queue_delayed_work(rds_wq, &conn->c_send_w, 1); 1190 1191 rds_message_put(rm); 1192 return 0; 1193 1194 out: 1195 if (rm) 1196 rds_message_put(rm); 1197 return ret; 1198 } 1199