1 /* 2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 3 */ 4 5 /* 6 * This file contains code imported from the OFED rds source file send.c 7 * Oracle elects to have and use the contents of send.c under and governed 8 * by the OpenIB.org BSD license (see below for full license text). However, 9 * the following notice accompanied the original version of this file: 10 */ 11 12 /* 13 * Copyright (c) 2006 Oracle. All rights reserved. 14 * 15 * This software is available to you under a choice of one of two 16 * licenses. You may choose to be licensed under the terms of the GNU 17 * General Public License (GPL) Version 2, available from the file 18 * COPYING in the main directory of this source tree, or the 19 * OpenIB.org BSD license below: 20 * 21 * Redistribution and use in source and binary forms, with or 22 * without modification, are permitted provided that the following 23 * conditions are met: 24 * 25 * - Redistributions of source code must retain the above 26 * copyright notice, this list of conditions and the following 27 * disclaimer. 28 * 29 * - Redistributions in binary form must reproduce the above 30 * copyright notice, this list of conditions and the following 31 * disclaimer in the documentation and/or other materials 32 * provided with the distribution. 33 * 34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 35 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 36 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 37 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 38 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 39 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 40 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 41 * SOFTWARE. 42 * 43 */ 44 #include <sys/stropts.h> 45 #include <sys/systm.h> 46 47 #include <sys/rds.h> 48 #include <sys/socket.h> 49 #include <sys/socketvar.h> 50 51 #include <sys/ib/clients/rdsv3/rdsv3.h> 52 #include <sys/ib/clients/rdsv3/rdma.h> 53 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 54 55 /* 56 * When transmitting messages in rdsv3_send_xmit, we need to emerge from 57 * time to time and briefly release the CPU. Otherwise the softlock watchdog 58 * will kick our shin. 59 * Also, it seems fairer to not let one busy connection stall all the 60 * others. 61 * 62 * send_batch_count is the number of times we'll loop in send_xmit. Setting 63 * it to 0 will restore the old behavior (where we looped until we had 64 * drained the queue). 65 */ 66 static int send_batch_count = 64; 67 68 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op); 69 /* 70 * Reset the send state. Caller must hold c_send_lock when calling here. 71 */ 72 void 73 rdsv3_send_reset(struct rdsv3_connection *conn) 74 { 75 struct rdsv3_message *rm, *tmp; 76 struct rdsv3_rdma_op *ro; 77 78 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn); 79 80 ASSERT(MUTEX_HELD(&conn->c_send_lock)); 81 82 if (conn->c_xmit_rm) { 83 rm = conn->c_xmit_rm; 84 ro = rm->m_rdma_op; 85 if (ro && ro->r_mapped) { 86 RDSV3_DPRINTF2("rdsv3_send_reset", 87 "rm %p mflg 0x%x map %d mihdl %p sgl %p", 88 rm, rm->m_flags, ro->r_mapped, 89 ro->r_rdma_sg[0].mihdl, 90 ro->r_rdma_sg[0].swr.wr_sgl); 91 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro); 92 } 93 /* 94 * Tell the user the RDMA op is no longer mapped by the 95 * transport. This isn't entirely true (it's flushed out 96 * independently) but as the connection is down, there's 97 * no ongoing RDMA to/from that memory 98 */ 99 rdsv3_message_unmapped(conn->c_xmit_rm); 100 rdsv3_message_put(conn->c_xmit_rm); 101 conn->c_xmit_rm = NULL; 102 } 103 104 conn->c_xmit_sg = 0; 105 conn->c_xmit_hdr_off = 0; 106 conn->c_xmit_data_off = 0; 107 conn->c_xmit_rdma_sent = 0; 108 conn->c_map_queued = 0; 109 110 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets; 111 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes; 112 113 /* Mark messages as retransmissions, and move them to the send q */ 114 mutex_enter(&conn->c_lock); 115 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 116 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 117 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags); 118 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) { 119 RDSV3_DPRINTF4("_send_reset", 120 "RT rm %p mflg 0x%x sgl %p", 121 rm, rm->m_flags, 122 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl); 123 } 124 } 125 list_move_tail(&conn->c_send_queue, &conn->c_retrans); 126 mutex_exit(&conn->c_lock); 127 128 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn); 129 } 130 131 /* 132 * We're making the concious trade-off here to only send one message 133 * down the connection at a time. 134 * Pro: 135 * - tx queueing is a simple fifo list 136 * - reassembly is optional and easily done by transports per conn 137 * - no per flow rx lookup at all, straight to the socket 138 * - less per-frag memory and wire overhead 139 * Con: 140 * - queued acks can be delayed behind large messages 141 * Depends: 142 * - small message latency is higher behind queued large messages 143 * - large message latency isn't starved by intervening small sends 144 */ 145 int 146 rdsv3_send_xmit(struct rdsv3_connection *conn) 147 { 148 struct rdsv3_message *rm; 149 unsigned int tmp; 150 unsigned int send_quota = send_batch_count; 151 struct rdsv3_scatterlist *sg; 152 int ret = 0; 153 int was_empty = 0; 154 list_t to_be_dropped; 155 156 restart: 157 if (!rdsv3_conn_up(conn)) 158 goto out; 159 160 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn); 161 162 list_create(&to_be_dropped, sizeof (struct rdsv3_message), 163 offsetof(struct rdsv3_message, m_conn_item)); 164 165 /* 166 * sendmsg calls here after having queued its message on the send 167 * queue. We only have one task feeding the connection at a time. If 168 * another thread is already feeding the queue then we back off. This 169 * avoids blocking the caller and trading per-connection data between 170 * caches per message. 171 */ 172 if (!mutex_tryenter(&conn->c_send_lock)) { 173 RDSV3_DPRINTF4("rdsv3_send_xmit", 174 "Another thread running(conn: %p)", conn); 175 rdsv3_stats_inc(s_send_sem_contention); 176 ret = -ENOMEM; 177 goto out; 178 } 179 atomic_add_32(&conn->c_senders, 1); 180 181 if (conn->c_trans->xmit_prepare) 182 conn->c_trans->xmit_prepare(conn); 183 184 /* 185 * spin trying to push headers and data down the connection until 186 * the connection doesn't make forward progress. 187 */ 188 while (--send_quota) { 189 /* 190 * See if need to send a congestion map update if we're 191 * between sending messages. The send_sem protects our sole 192 * use of c_map_offset and _bytes. 193 * Note this is used only by transports that define a special 194 * xmit_cong_map function. For all others, we create allocate 195 * a cong_map message and treat it just like any other send. 196 */ 197 if (conn->c_map_bytes) { 198 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 199 conn->c_map_offset); 200 if (ret <= 0) 201 break; 202 203 conn->c_map_offset += ret; 204 conn->c_map_bytes -= ret; 205 if (conn->c_map_bytes) 206 continue; 207 } 208 209 /* 210 * If we're done sending the current message, clear the 211 * offset and S/G temporaries. 212 */ 213 rm = conn->c_xmit_rm; 214 if (rm != NULL && 215 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) && 216 conn->c_xmit_sg == rm->m_nents) { 217 conn->c_xmit_rm = NULL; 218 conn->c_xmit_sg = 0; 219 conn->c_xmit_hdr_off = 0; 220 conn->c_xmit_data_off = 0; 221 conn->c_xmit_rdma_sent = 0; 222 223 /* Release the reference to the previous message. */ 224 rdsv3_message_put(rm); 225 rm = NULL; 226 } 227 228 /* If we're asked to send a cong map update, do so. */ 229 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 230 if (conn->c_trans->xmit_cong_map != NULL) { 231 conn->c_map_offset = 0; 232 conn->c_map_bytes = 233 sizeof (struct rdsv3_header) + 234 RDSV3_CONG_MAP_BYTES; 235 continue; 236 } 237 238 rm = rdsv3_cong_update_alloc(conn); 239 if (IS_ERR(rm)) { 240 ret = PTR_ERR(rm); 241 break; 242 } 243 244 conn->c_xmit_rm = rm; 245 } 246 247 /* 248 * Grab the next message from the send queue, if there is one. 249 * 250 * c_xmit_rm holds a ref while we're sending this message down 251 * the connction. We can use this ref while holding the 252 * send_sem.. rdsv3_send_reset() is serialized with it. 253 */ 254 if (rm == NULL) { 255 unsigned int len; 256 257 mutex_enter(&conn->c_lock); 258 259 if (!list_is_empty(&conn->c_send_queue)) { 260 rm = list_remove_head(&conn->c_send_queue); 261 rdsv3_message_addref(rm); 262 263 /* 264 * Move the message from the send queue to 265 * the retransmit 266 * list right away. 267 */ 268 list_insert_tail(&conn->c_retrans, rm); 269 } 270 271 mutex_exit(&conn->c_lock); 272 273 if (rm == NULL) { 274 was_empty = 1; 275 break; 276 } 277 278 /* 279 * Unfortunately, the way Infiniband deals with 280 * RDMA to a bad MR key is by moving the entire 281 * queue pair to error state. We cold possibly 282 * recover from that, but right now we drop the 283 * connection. 284 * Therefore, we never retransmit messages with 285 * RDMA ops. 286 */ 287 if (rm->m_rdma_op && 288 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) { 289 mutex_enter(&conn->c_lock); 290 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, 291 &rm->m_flags)) 292 list_remove_node(&rm->m_conn_item); 293 list_insert_tail(&to_be_dropped, rm); 294 mutex_exit(&conn->c_lock); 295 rdsv3_message_put(rm); 296 continue; 297 } 298 299 /* Require an ACK every once in a while */ 300 len = ntohl(rm->m_inc.i_hdr.h_len); 301 if (conn->c_unacked_packets == 0 || 302 conn->c_unacked_bytes < len) { 303 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 304 305 conn->c_unacked_packets = 306 rdsv3_sysctl_max_unacked_packets; 307 conn->c_unacked_bytes = 308 rdsv3_sysctl_max_unacked_bytes; 309 rdsv3_stats_inc(s_send_ack_required); 310 } else { 311 conn->c_unacked_bytes -= len; 312 conn->c_unacked_packets--; 313 } 314 315 conn->c_xmit_rm = rm; 316 } 317 318 /* 319 * Try and send an rdma message. Let's see if we can 320 * keep this simple and require that the transport either 321 * send the whole rdma or none of it. 322 */ 323 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { 324 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); 325 if (ret) 326 break; 327 conn->c_xmit_rdma_sent = 1; 328 /* 329 * The transport owns the mapped memory for now. 330 * You can't unmap it while it's on the send queue 331 */ 332 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags); 333 } 334 335 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) || 336 conn->c_xmit_sg < rm->m_nents) { 337 ret = conn->c_trans->xmit(conn, rm, 338 conn->c_xmit_hdr_off, 339 conn->c_xmit_sg, 340 conn->c_xmit_data_off); 341 if (ret <= 0) 342 break; 343 344 if (conn->c_xmit_hdr_off < 345 sizeof (struct rdsv3_header)) { 346 tmp = min(ret, 347 sizeof (struct rdsv3_header) - 348 conn->c_xmit_hdr_off); 349 conn->c_xmit_hdr_off += tmp; 350 ret -= tmp; 351 } 352 353 sg = &rm->m_sg[conn->c_xmit_sg]; 354 while (ret) { 355 tmp = min(ret, rdsv3_sg_len(sg) - 356 conn->c_xmit_data_off); 357 conn->c_xmit_data_off += tmp; 358 ret -= tmp; 359 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) { 360 conn->c_xmit_data_off = 0; 361 sg++; 362 conn->c_xmit_sg++; 363 ASSERT(!(ret != 0 && 364 conn->c_xmit_sg == rm->m_nents)); 365 } 366 } 367 } 368 } 369 370 /* Nuke any messages we decided not to retransmit. */ 371 if (!list_is_empty(&to_be_dropped)) 372 rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 373 374 if (conn->c_trans->xmit_complete) 375 conn->c_trans->xmit_complete(conn); 376 377 /* 378 * We might be racing with another sender who queued a message but 379 * backed off on noticing that we held the c_send_lock. If we check 380 * for queued messages after dropping the sem then either we'll 381 * see the queued message or the queuer will get the sem. If we 382 * notice the queued message then we trigger an immediate retry. 383 * 384 * We need to be careful only to do this when we stopped processing 385 * the send queue because it was empty. It's the only way we 386 * stop processing the loop when the transport hasn't taken 387 * responsibility for forward progress. 388 */ 389 mutex_exit(&conn->c_send_lock); 390 391 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 392 /* 393 * We exhausted the send quota, but there's work left to 394 * do. Return and (re-)schedule the send worker. 395 */ 396 ret = -EAGAIN; 397 } 398 399 atomic_dec_32(&conn->c_senders); 400 401 if (ret == 0 && was_empty) { 402 /* 403 * A simple bit test would be way faster than taking the 404 * spin lock 405 */ 406 mutex_enter(&conn->c_lock); 407 if (!list_is_empty(&conn->c_send_queue)) { 408 rdsv3_stats_inc(s_send_sem_queue_raced); 409 ret = -EAGAIN; 410 } 411 mutex_exit(&conn->c_lock); 412 } 413 414 out: 415 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)", 416 conn, ret); 417 return (ret); 418 } 419 420 static void 421 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm) 422 { 423 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len); 424 425 ASSERT(mutex_owned(&rs->rs_lock)); 426 427 ASSERT(rs->rs_snd_bytes >= len); 428 rs->rs_snd_bytes -= len; 429 430 if (rs->rs_snd_bytes == 0) 431 rdsv3_stats_inc(s_send_queue_empty); 432 } 433 434 static inline int 435 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack, 436 is_acked_func is_acked) 437 { 438 if (is_acked) 439 return (is_acked(rm, ack)); 440 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack); 441 } 442 443 /* 444 * Returns true if there are no messages on the send and retransmit queues 445 * which have a sequence number greater than or equal to the given sequence 446 * number. 447 */ 448 int 449 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq) 450 { 451 struct rdsv3_message *rm; 452 int ret = 1; 453 454 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn); 455 456 mutex_enter(&conn->c_lock); 457 458 /* XXX - original code spits out warning */ 459 rm = list_head(&conn->c_retrans); 460 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 461 ret = 0; 462 463 /* XXX - original code spits out warning */ 464 rm = list_head(&conn->c_send_queue); 465 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 466 ret = 0; 467 468 mutex_exit(&conn->c_lock); 469 470 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn); 471 472 return (ret); 473 } 474 475 /* 476 * This is pretty similar to what happens below in the ACK 477 * handling code - except that we call here as soon as we get 478 * the IB send completion on the RDMA op and the accompanying 479 * message. 480 */ 481 void 482 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status) 483 { 484 struct rdsv3_sock *rs = NULL; 485 struct rdsv3_rdma_op *ro; 486 struct rdsv3_notifier *notifier; 487 488 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm); 489 490 mutex_enter(&rm->m_rs_lock); 491 492 ro = rm->m_rdma_op; 493 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) && 494 ro && ro->r_notify && ro->r_notifier) { 495 notifier = ro->r_notifier; 496 rs = rm->m_rs; 497 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 498 499 notifier->n_status = status; 500 mutex_enter(&rs->rs_lock); 501 list_insert_tail(&rs->rs_notify_queue, notifier); 502 mutex_exit(&rs->rs_lock); 503 ro->r_notifier = NULL; 504 } 505 506 mutex_exit(&rm->m_rs_lock); 507 508 if (rs) { 509 struct rsock *sk = rdsv3_rs_to_sk(rs); 510 int error; 511 512 rdsv3_wake_sk_sleep(rs); 513 514 /* wake up anyone waiting in poll */ 515 sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL, 516 0, 0, &error, NULL); 517 if (error != 0) { 518 RDSV3_DPRINTF2("rdsv3_recv_incoming", 519 "su_recv returned: %d", error); 520 } 521 522 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 523 } 524 525 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm); 526 } 527 528 /* 529 * This is the same as rdsv3_rdma_send_complete except we 530 * don't do any locking - we have all the ingredients (message, 531 * socket, socket lock) and can just move the notifier. 532 */ 533 static inline void 534 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm, 535 int status) 536 { 537 struct rdsv3_rdma_op *ro; 538 void *ic; 539 540 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete", 541 "Enter(rs: %p, rm: %p)", rs, rm); 542 543 ro = rm->m_rdma_op; 544 if (ro && ro->r_notify && ro->r_notifier) { 545 ro->r_notifier->n_status = status; 546 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier); 547 ro->r_notifier = NULL; 548 } 549 550 /* No need to wake the app - caller does this */ 551 } 552 553 /* 554 * This is called from the IB send completion when we detect 555 * a RDMA operation that failed with remote access error. 556 * So speed is not an issue here. 557 */ 558 struct rdsv3_message * 559 rdsv3_send_get_message(struct rdsv3_connection *conn, 560 struct rdsv3_rdma_op *op) 561 { 562 struct rdsv3_message *rm, *tmp, *found = NULL; 563 564 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn); 565 566 mutex_enter(&conn->c_lock); 567 568 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 569 if (rm->m_rdma_op == op) { 570 atomic_add_32(&rm->m_refcount, 1); 571 found = rm; 572 goto out; 573 } 574 } 575 576 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue, 577 m_conn_item) { 578 if (rm->m_rdma_op == op) { 579 atomic_add_32(&rm->m_refcount, 1); 580 found = rm; 581 break; 582 } 583 } 584 585 out: 586 mutex_exit(&conn->c_lock); 587 588 return (found); 589 } 590 591 /* 592 * This removes messages from the socket's list if they're on it. The list 593 * argument must be private to the caller, we must be able to modify it 594 * without locks. The messages must have a reference held for their 595 * position on the list. This function will drop that reference after 596 * removing the messages from the 'messages' list regardless of if it found 597 * the messages on the socket list or not. 598 */ 599 void 600 rdsv3_send_remove_from_sock(struct list *messages, int status) 601 { 602 struct rdsv3_sock *rs = NULL; 603 struct rdsv3_message *rm; 604 605 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter"); 606 607 while (!list_is_empty(messages)) { 608 int was_on_sock = 0; 609 rm = list_remove_head(messages); 610 611 /* 612 * If we see this flag cleared then we're *sure* that someone 613 * else beat us to removing it from the sock. If we race 614 * with their flag update we'll get the lock and then really 615 * see that the flag has been cleared. 616 * 617 * The message spinlock makes sure nobody clears rm->m_rs 618 * while we're messing with it. It does not prevent the 619 * message from being removed from the socket, though. 620 */ 621 mutex_enter(&rm->m_rs_lock); 622 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) 623 goto unlock_and_drop; 624 625 if (rs != rm->m_rs) { 626 if (rs) { 627 rdsv3_wake_sk_sleep(rs); 628 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 629 } 630 rs = rm->m_rs; 631 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 632 } 633 634 mutex_enter(&rs->rs_lock); 635 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) { 636 struct rdsv3_rdma_op *ro = rm->m_rdma_op; 637 struct rdsv3_notifier *notifier; 638 639 list_remove_node(&rm->m_sock_item); 640 rdsv3_send_sndbuf_remove(rs, rm); 641 if (ro && ro->r_notifier && 642 (status || ro->r_notify)) { 643 notifier = ro->r_notifier; 644 list_insert_tail(&rs->rs_notify_queue, 645 notifier); 646 if (!notifier->n_status) 647 notifier->n_status = status; 648 rm->m_rdma_op->r_notifier = NULL; 649 } 650 was_on_sock = 1; 651 rm->m_rs = NULL; 652 } 653 mutex_exit(&rs->rs_lock); 654 655 unlock_and_drop: 656 mutex_exit(&rm->m_rs_lock); 657 rdsv3_message_put(rm); 658 if (was_on_sock) 659 rdsv3_message_put(rm); 660 } 661 662 if (rs) { 663 rdsv3_wake_sk_sleep(rs); 664 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 665 } 666 667 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return"); 668 } 669 670 /* 671 * Transports call here when they've determined that the receiver queued 672 * messages up to, and including, the given sequence number. Messages are 673 * moved to the retrans queue when rdsv3_send_xmit picks them off the send 674 * queue. This means that in the TCP case, the message may not have been 675 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 676 * checks the RDSV3_MSG_HAS_ACK_SEQ bit. 677 * 678 * XXX It's not clear to me how this is safely serialized with socket 679 * destruction. Maybe it should bail if it sees SOCK_DEAD. 680 */ 681 void 682 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack, 683 is_acked_func is_acked) 684 { 685 struct rdsv3_message *rm, *tmp; 686 list_t list; 687 688 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn); 689 690 list_create(&list, sizeof (struct rdsv3_message), 691 offsetof(struct rdsv3_message, m_conn_item)); 692 693 mutex_enter(&conn->c_lock); 694 695 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 696 if (!rdsv3_send_is_acked(rm, ack, is_acked)) 697 break; 698 699 list_remove_node(&rm->m_conn_item); 700 list_insert_tail(&list, rm); 701 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 702 } 703 704 #if 0 705 XXX 706 /* order flag updates with spin locks */ 707 if (!list_is_empty(&list)) 708 smp_mb__after_clear_bit(); 709 #endif 710 711 mutex_exit(&conn->c_lock); 712 713 /* now remove the messages from the sock list as needed */ 714 rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 715 716 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn); 717 } 718 719 void 720 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest) 721 { 722 struct rdsv3_message *rm, *tmp; 723 struct rdsv3_connection *conn; 724 list_t list; 725 int wake = 0; 726 727 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs); 728 729 list_create(&list, sizeof (struct rdsv3_message), 730 offsetof(struct rdsv3_message, m_sock_item)); 731 732 /* get all the messages we're dropping under the rs lock */ 733 mutex_enter(&rs->rs_lock); 734 735 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue, 736 m_sock_item) { 737 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 738 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 739 continue; 740 wake = 1; 741 list_remove(&rs->rs_send_queue, rm); 742 list_insert_tail(&list, rm); 743 rdsv3_send_sndbuf_remove(rs, rm); 744 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 745 } 746 747 mutex_exit(&rs->rs_lock); 748 749 conn = NULL; 750 751 /* now remove the messages from the conn list as needed */ 752 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) { 753 /* 754 * We do this here rather than in the loop above, so that 755 * we don't have to nest m_rs_lock under rs->rs_lock 756 */ 757 mutex_enter(&rm->m_rs_lock); 758 /* If this is a RDMA operation, notify the app. */ 759 __rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); 760 rm->m_rs = NULL; 761 mutex_exit(&rm->m_rs_lock); 762 763 /* 764 * If we see this flag cleared then we're *sure* that someone 765 * else beat us to removing it from the conn. If we race 766 * with their flag update we'll get the lock and then really 767 * see that the flag has been cleared. 768 */ 769 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) 770 continue; 771 772 if (conn != rm->m_inc.i_conn) { 773 if (conn) 774 mutex_exit(&conn->c_lock); 775 conn = rm->m_inc.i_conn; 776 mutex_enter(&conn->c_lock); 777 } 778 779 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) { 780 list_remove_node(&rm->m_conn_item); 781 rdsv3_message_put(rm); 782 } 783 } 784 785 if (conn) 786 mutex_exit(&conn->c_lock); 787 788 if (wake) 789 rdsv3_wake_sk_sleep(rs); 790 791 while (!list_is_empty(&list)) { 792 rm = list_remove_head(&list); 793 794 rdsv3_message_wait(rm); 795 rdsv3_message_put(rm); 796 } 797 798 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs); 799 } 800 801 /* 802 * we only want this to fire once so we use the callers 'queued'. It's 803 * possible that another thread can race with us and remove the 804 * message from the flow with RDSV3_CANCEL_SENT_TO. 805 */ 806 static int 807 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn, 808 struct rdsv3_message *rm, uint16_be_t sport, 809 uint16_be_t dport, int *queued) 810 { 811 uint32_t len; 812 813 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm); 814 815 if (*queued) 816 goto out; 817 818 len = ntohl(rm->m_inc.i_hdr.h_len); 819 820 /* 821 * this is the only place which holds both the socket's rs_lock 822 * and the connection's c_lock 823 */ 824 mutex_enter(&rs->rs_lock); 825 826 /* 827 * If there is a little space in sndbuf, we don't queue anything, 828 * and userspace gets -EAGAIN. But poll() indicates there's send 829 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 830 * freed up by incoming acks. So we check the *old* value of 831 * rs_snd_bytes here to allow the last msg to exceed the buffer, 832 * and poll() now knows no more data can be sent. 833 */ 834 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) { 835 rs->rs_snd_bytes += len; 836 837 /* 838 * let recv side know we are close to send space exhaustion. 839 * This is probably not the optimal way to do it, as this 840 * means we set the flag on *all* messages as soon as our 841 * throughput hits a certain threshold. 842 */ 843 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2) 844 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 845 846 list_insert_tail(&rs->rs_send_queue, rm); 847 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 848 849 rdsv3_message_addref(rm); 850 rm->m_rs = rs; 851 852 /* 853 * The code ordering is a little weird, but we're 854 * trying to minimize the time we hold c_lock 855 */ 856 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport, 857 dport, 0); 858 rm->m_inc.i_conn = conn; 859 rdsv3_message_addref(rm); /* XXX - called twice */ 860 861 mutex_enter(&conn->c_lock); 862 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++); 863 list_insert_tail(&conn->c_send_queue, rm); 864 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 865 mutex_exit(&conn->c_lock); 866 867 RDSV3_DPRINTF5("rdsv3_send_queue_rm", 868 "queued msg %p len %d, rs %p bytes %d seq %llu", 869 rm, len, rs, rs->rs_snd_bytes, 870 (unsigned long long)ntohll( 871 rm->m_inc.i_hdr.h_sequence)); 872 873 *queued = 1; 874 } 875 876 mutex_exit(&rs->rs_lock); 877 878 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs); 879 out: 880 return (*queued); 881 } 882 883 static int 884 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm, 885 struct msghdr *msg, int *allocated_mr) 886 { 887 struct cmsghdr *cmsg; 888 int ret = 0; 889 890 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs); 891 892 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 893 894 if (cmsg->cmsg_level != SOL_RDS) 895 continue; 896 897 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d", 898 cmsg, rm, cmsg->cmsg_type); 899 /* 900 * As a side effect, RDMA_DEST and RDMA_MAP will set 901 * rm->m_rdma_cookie and rm->m_rdma_mr. 902 */ 903 switch (cmsg->cmsg_type) { 904 case RDS_CMSG_RDMA_ARGS: 905 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg); 906 break; 907 908 case RDS_CMSG_RDMA_DEST: 909 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg); 910 break; 911 912 case RDS_CMSG_RDMA_MAP: 913 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg); 914 if (ret) 915 *allocated_mr = 1; 916 break; 917 918 default: 919 return (-EINVAL); 920 } 921 922 if (ret) 923 break; 924 } 925 926 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs); 927 928 return (ret); 929 } 930 931 extern unsigned long rdsv3_max_bcopy_size; 932 933 int 934 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg, 935 size_t payload_len) 936 { 937 struct rsock *sk = rdsv3_rs_to_sk(rs); 938 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 939 uint32_be_t daddr; 940 uint16_be_t dport; 941 struct rdsv3_message *rm = NULL; 942 struct rdsv3_connection *conn; 943 int ret = 0; 944 int queued = 0, allocated_mr = 0; 945 int nonblock = msg->msg_flags & MSG_DONTWAIT; 946 long timeo = rdsv3_sndtimeo(sk, nonblock); 947 948 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs); 949 950 if (msg->msg_namelen) { 951 /* XXX fail non-unicast destination IPs? */ 952 if (msg->msg_namelen < sizeof (*usin) || 953 usin->sin_family != AF_INET_OFFLOAD) { 954 ret = -EINVAL; 955 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 956 goto out; 957 } 958 daddr = usin->sin_addr.s_addr; 959 dport = usin->sin_port; 960 } else { 961 /* We only care about consistency with ->connect() */ 962 mutex_enter(&sk->sk_lock); 963 daddr = rs->rs_conn_addr; 964 dport = rs->rs_conn_port; 965 mutex_exit(&sk->sk_lock); 966 } 967 968 /* racing with another thread binding seems ok here */ 969 if (daddr == 0 || rs->rs_bound_addr == 0) { 970 ret = -ENOTCONN; /* XXX not a great errno */ 971 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 972 goto out; 973 } 974 975 if (payload_len > rdsv3_max_bcopy_size) { 976 RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d", 977 payload_len); 978 ret = -EMSGSIZE; 979 goto out; 980 } 981 982 rm = rdsv3_message_copy_from_user(uio, payload_len); 983 if (IS_ERR(rm)) { 984 ret = PTR_ERR(rm); 985 RDSV3_DPRINTF2("rdsv3_sendmsg", 986 "rdsv3_message_copy_from_user failed %d", -ret); 987 rm = NULL; 988 goto out; 989 } 990 991 rm->m_daddr = daddr; 992 993 /* Parse any control messages the user may have included. */ 994 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr); 995 if (ret) { 996 RDSV3_DPRINTF2("rdsv3_sendmsg", 997 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d", 998 rs, rm, msg, ret); 999 goto out; 1000 } 1001 1002 /* 1003 * rdsv3_conn_create has a spinlock that runs with IRQ off. 1004 * Caching the conn in the socket helps a lot. 1005 */ 1006 mutex_enter(&rs->rs_conn_lock); 1007 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) { 1008 conn = rs->rs_conn; 1009 } else { 1010 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr, 1011 daddr, rs->rs_transport, KM_NOSLEEP); 1012 if (IS_ERR(conn)) { 1013 mutex_exit(&rs->rs_conn_lock); 1014 ret = PTR_ERR(conn); 1015 RDSV3_DPRINTF2("rdsv3_sendmsg", 1016 "rdsv3_conn_create_outgoing failed %d", 1017 -ret); 1018 goto out; 1019 } 1020 rs->rs_conn = conn; 1021 } 1022 mutex_exit(&rs->rs_conn_lock); 1023 1024 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1025 conn->c_trans->xmit_rdma == NULL) { 1026 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p", 1027 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1028 ret = -EOPNOTSUPP; 1029 goto out; 1030 } 1031 1032 /* 1033 * If the connection is down, trigger a connect. We may 1034 * have scheduled a delayed reconnect however - in this case 1035 * we should not interfere. 1036 */ 1037 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1038 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1039 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1040 1041 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs); 1042 if (ret) { 1043 mutex_enter(&rs->rs_congested_lock); 1044 rs->rs_seen_congestion = 1; 1045 cv_signal(&rs->rs_congested_cv); 1046 mutex_exit(&rs->rs_congested_lock); 1047 1048 RDSV3_DPRINTF2("rdsv3_sendmsg", 1049 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret); 1050 goto out; 1051 } 1052 1053 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, 1054 &queued); 1055 if (!queued) { 1056 /* rdsv3_stats_inc(s_send_queue_full); */ 1057 /* XXX make sure this is reasonable */ 1058 if (payload_len > rdsv3_sk_sndbuf(rs)) { 1059 ret = -EMSGSIZE; 1060 RDSV3_DPRINTF2("rdsv3_sendmsg", 1061 "msgsize(%d) too big, returning: %d", 1062 payload_len, -ret); 1063 goto out; 1064 } 1065 if (nonblock) { 1066 ret = -EAGAIN; 1067 RDSV3_DPRINTF3("rdsv3_sendmsg", 1068 "send queue full (%d), returning: %d", 1069 payload_len, -ret); 1070 goto out; 1071 } 1072 1073 #if 0 1074 ret = rdsv3_wait_sig(sk->sk_sleep, 1075 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1076 dport, &queued))); 1077 if (ret == 0) { 1078 /* signal/timeout pending */ 1079 RDSV3_DPRINTF2("rdsv3_sendmsg", 1080 "woke due to signal: %d", ret); 1081 ret = -ERESTART; 1082 goto out; 1083 } 1084 #else 1085 mutex_enter(&sk->sk_sleep->waitq_mutex); 1086 sk->sk_sleep->waitq_waiters++; 1087 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1088 dport, &queued)) { 1089 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 1090 &sk->sk_sleep->waitq_mutex); 1091 if (ret == 0) { 1092 /* signal/timeout pending */ 1093 RDSV3_DPRINTF2("rdsv3_sendmsg", 1094 "woke due to signal: %d", ret); 1095 ret = -EINTR; 1096 sk->sk_sleep->waitq_waiters--; 1097 mutex_exit(&sk->sk_sleep->waitq_mutex); 1098 goto out; 1099 } 1100 } 1101 sk->sk_sleep->waitq_waiters--; 1102 mutex_exit(&sk->sk_sleep->waitq_mutex); 1103 #endif 1104 1105 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d", 1106 queued); 1107 1108 ASSERT(queued); 1109 ret = 0; 1110 } 1111 1112 /* 1113 * By now we've committed to the send. We reuse rdsv3_send_worker() 1114 * to retry sends in the rds thread if the transport asks us to. 1115 */ 1116 rdsv3_stats_inc(s_send_queued); 1117 1118 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1119 (void) rdsv3_send_worker(&conn->c_send_w.work); 1120 1121 rdsv3_message_put(rm); 1122 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)", 1123 rs, payload_len); 1124 return (payload_len); 1125 1126 out: 1127 /* 1128 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1129 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1130 * or in any other way, we need to destroy the MR again 1131 */ 1132 if (allocated_mr) 1133 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 1134 1); 1135 1136 if (rm) 1137 rdsv3_message_put(rm); 1138 return (ret); 1139 } 1140 1141 /* 1142 * Reply to a ping packet. 1143 */ 1144 int 1145 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport) 1146 { 1147 struct rdsv3_message *rm; 1148 int ret = 0; 1149 1150 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn); 1151 1152 rm = rdsv3_message_alloc(0, KM_NOSLEEP); 1153 if (!rm) { 1154 ret = -ENOMEM; 1155 goto out; 1156 } 1157 1158 rm->m_daddr = conn->c_faddr; 1159 1160 /* 1161 * If the connection is down, trigger a connect. We may 1162 * have scheduled a delayed reconnect however - in this case 1163 * we should not interfere. 1164 */ 1165 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1166 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1167 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1168 1169 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL); 1170 if (ret) 1171 goto out; 1172 1173 mutex_enter(&conn->c_lock); 1174 list_insert_tail(&conn->c_send_queue, rm); 1175 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 1176 rdsv3_message_addref(rm); 1177 rm->m_inc.i_conn = conn; 1178 1179 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1180 conn->c_next_tx_seq); 1181 conn->c_next_tx_seq++; 1182 mutex_exit(&conn->c_lock); 1183 1184 rdsv3_stats_inc(s_send_queued); 1185 rdsv3_stats_inc(s_send_pong); 1186 1187 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1188 (void) rdsv3_send_xmit(conn); 1189 1190 rdsv3_message_put(rm); 1191 1192 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn); 1193 return (0); 1194 1195 out: 1196 if (rm) 1197 rdsv3_message_put(rm); 1198 return (ret); 1199 } 1200