1 /* 2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 3 */ 4 5 /* 6 * This file contains code imported from the OFED rds source file send.c 7 * Oracle elects to have and use the contents of send.c under and governed 8 * by the OpenIB.org BSD license (see below for full license text). However, 9 * the following notice accompanied the original version of this file: 10 */ 11 12 /* 13 * Copyright (c) 2006 Oracle. All rights reserved. 14 * 15 * This software is available to you under a choice of one of two 16 * licenses. You may choose to be licensed under the terms of the GNU 17 * General Public License (GPL) Version 2, available from the file 18 * COPYING in the main directory of this source tree, or the 19 * OpenIB.org BSD license below: 20 * 21 * Redistribution and use in source and binary forms, with or 22 * without modification, are permitted provided that the following 23 * conditions are met: 24 * 25 * - Redistributions of source code must retain the above 26 * copyright notice, this list of conditions and the following 27 * disclaimer. 28 * 29 * - Redistributions in binary form must reproduce the above 30 * copyright notice, this list of conditions and the following 31 * disclaimer in the documentation and/or other materials 32 * provided with the distribution. 33 * 34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 35 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 36 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 37 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 38 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 39 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 40 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 41 * SOFTWARE. 42 * 43 */ 44 #include <sys/stropts.h> 45 #include <sys/systm.h> 46 47 #include <sys/rds.h> 48 #include <sys/socket.h> 49 #include <sys/socketvar.h> 50 51 #include <sys/ib/clients/rdsv3/rdsv3.h> 52 #include <sys/ib/clients/rdsv3/rdma.h> 53 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 54 55 /* 56 * When transmitting messages in rdsv3_send_xmit, we need to emerge from 57 * time to time and briefly release the CPU. Otherwise the softlock watchdog 58 * will kick our shin. 59 * Also, it seems fairer to not let one busy connection stall all the 60 * others. 61 * 62 * send_batch_count is the number of times we'll loop in send_xmit. Setting 63 * it to 0 will restore the old behavior (where we looped until we had 64 * drained the queue). 65 */ 66 static int send_batch_count = 64; 67 68 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op); 69 /* 70 * Reset the send state. Caller must hold c_send_lock when calling here. 71 */ 72 void 73 rdsv3_send_reset(struct rdsv3_connection *conn) 74 { 75 struct rdsv3_message *rm, *tmp; 76 struct rdsv3_rdma_op *ro; 77 78 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn); 79 80 ASSERT(MUTEX_HELD(&conn->c_send_lock)); 81 82 if (conn->c_xmit_rm) { 83 rm = conn->c_xmit_rm; 84 ro = rm->m_rdma_op; 85 if (ro && ro->r_mapped) { 86 RDSV3_DPRINTF2("rdsv3_send_reset", 87 "rm %p mflg 0x%x map %d mihdl %p sgl %p", 88 rm, rm->m_flags, ro->r_mapped, 89 ro->r_rdma_sg[0].mihdl, 90 ro->r_rdma_sg[0].swr.wr_sgl); 91 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro); 92 } 93 /* 94 * Tell the user the RDMA op is no longer mapped by the 95 * transport. This isn't entirely true (it's flushed out 96 * independently) but as the connection is down, there's 97 * no ongoing RDMA to/from that memory 98 */ 99 rdsv3_message_unmapped(conn->c_xmit_rm); 100 rdsv3_message_put(conn->c_xmit_rm); 101 conn->c_xmit_rm = NULL; 102 } 103 104 conn->c_xmit_sg = 0; 105 conn->c_xmit_hdr_off = 0; 106 conn->c_xmit_data_off = 0; 107 conn->c_xmit_rdma_sent = 0; 108 conn->c_map_queued = 0; 109 110 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets; 111 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes; 112 113 /* Mark messages as retransmissions, and move them to the send q */ 114 mutex_enter(&conn->c_lock); 115 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 116 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 117 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags); 118 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) { 119 RDSV3_DPRINTF4("_send_reset", 120 "RT rm %p mflg 0x%x sgl %p", 121 rm, rm->m_flags, 122 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl); 123 } 124 } 125 list_move_tail(&conn->c_send_queue, &conn->c_retrans); 126 mutex_exit(&conn->c_lock); 127 128 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn); 129 } 130 131 /* 132 * We're making the concious trade-off here to only send one message 133 * down the connection at a time. 134 * Pro: 135 * - tx queueing is a simple fifo list 136 * - reassembly is optional and easily done by transports per conn 137 * - no per flow rx lookup at all, straight to the socket 138 * - less per-frag memory and wire overhead 139 * Con: 140 * - queued acks can be delayed behind large messages 141 * Depends: 142 * - small message latency is higher behind queued large messages 143 * - large message latency isn't starved by intervening small sends 144 */ 145 int 146 rdsv3_send_xmit(struct rdsv3_connection *conn) 147 { 148 struct rdsv3_message *rm; 149 unsigned int tmp; 150 unsigned int send_quota = send_batch_count; 151 struct rdsv3_scatterlist *sg; 152 int ret = 0; 153 int was_empty = 0; 154 list_t to_be_dropped; 155 156 if (!rdsv3_conn_up(conn)) 157 goto out; 158 159 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn); 160 161 list_create(&to_be_dropped, sizeof (struct rdsv3_message), 162 offsetof(struct rdsv3_message, m_conn_item)); 163 164 /* 165 * sendmsg calls here after having queued its message on the send 166 * queue. We only have one task feeding the connection at a time. If 167 * another thread is already feeding the queue then we back off. This 168 * avoids blocking the caller and trading per-connection data between 169 * caches per message. 170 */ 171 if (!mutex_tryenter(&conn->c_send_lock)) { 172 RDSV3_DPRINTF4("rdsv3_send_xmit", 173 "Another thread running(conn: %p)", conn); 174 rdsv3_stats_inc(s_send_sem_contention); 175 ret = -ENOMEM; 176 goto out; 177 } 178 atomic_inc_32(&conn->c_senders); 179 180 if (conn->c_trans->xmit_prepare) 181 conn->c_trans->xmit_prepare(conn); 182 183 /* 184 * spin trying to push headers and data down the connection until 185 * the connection doesn't make forward progress. 186 */ 187 while (--send_quota) { 188 /* 189 * See if need to send a congestion map update if we're 190 * between sending messages. The send_sem protects our sole 191 * use of c_map_offset and _bytes. 192 * Note this is used only by transports that define a special 193 * xmit_cong_map function. For all others, we create allocate 194 * a cong_map message and treat it just like any other send. 195 */ 196 if (conn->c_map_bytes) { 197 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 198 conn->c_map_offset); 199 if (ret <= 0) 200 break; 201 202 conn->c_map_offset += ret; 203 conn->c_map_bytes -= ret; 204 if (conn->c_map_bytes) 205 continue; 206 } 207 208 /* 209 * If we're done sending the current message, clear the 210 * offset and S/G temporaries. 211 */ 212 rm = conn->c_xmit_rm; 213 if (rm != NULL && 214 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) && 215 conn->c_xmit_sg == rm->m_nents) { 216 conn->c_xmit_rm = NULL; 217 conn->c_xmit_sg = 0; 218 conn->c_xmit_hdr_off = 0; 219 conn->c_xmit_data_off = 0; 220 conn->c_xmit_rdma_sent = 0; 221 222 /* Release the reference to the previous message. */ 223 rdsv3_message_put(rm); 224 rm = NULL; 225 } 226 227 /* If we're asked to send a cong map update, do so. */ 228 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 229 if (conn->c_trans->xmit_cong_map != NULL) { 230 conn->c_map_offset = 0; 231 conn->c_map_bytes = 232 sizeof (struct rdsv3_header) + 233 RDSV3_CONG_MAP_BYTES; 234 continue; 235 } 236 237 rm = rdsv3_cong_update_alloc(conn); 238 if (IS_ERR(rm)) { 239 ret = PTR_ERR(rm); 240 break; 241 } 242 243 conn->c_xmit_rm = rm; 244 } 245 246 /* 247 * Grab the next message from the send queue, if there is one. 248 * 249 * c_xmit_rm holds a ref while we're sending this message down 250 * the connction. We can use this ref while holding the 251 * send_sem.. rdsv3_send_reset() is serialized with it. 252 */ 253 if (rm == NULL) { 254 unsigned int len; 255 256 mutex_enter(&conn->c_lock); 257 258 if (!list_is_empty(&conn->c_send_queue)) { 259 rm = list_remove_head(&conn->c_send_queue); 260 rdsv3_message_addref(rm); 261 262 /* 263 * Move the message from the send queue to 264 * the retransmit 265 * list right away. 266 */ 267 list_insert_tail(&conn->c_retrans, rm); 268 } 269 270 mutex_exit(&conn->c_lock); 271 272 if (rm == NULL) { 273 was_empty = 1; 274 break; 275 } 276 277 /* 278 * Unfortunately, the way Infiniband deals with 279 * RDMA to a bad MR key is by moving the entire 280 * queue pair to error state. We cold possibly 281 * recover from that, but right now we drop the 282 * connection. 283 * Therefore, we never retransmit messages with 284 * RDMA ops. 285 */ 286 if (rm->m_rdma_op && 287 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) { 288 mutex_enter(&conn->c_lock); 289 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, 290 &rm->m_flags)) 291 list_remove_node(&rm->m_conn_item); 292 list_insert_tail(&to_be_dropped, rm); 293 mutex_exit(&conn->c_lock); 294 rdsv3_message_put(rm); 295 continue; 296 } 297 298 /* Require an ACK every once in a while */ 299 len = ntohl(rm->m_inc.i_hdr.h_len); 300 if (conn->c_unacked_packets == 0 || 301 conn->c_unacked_bytes < len) { 302 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 303 304 conn->c_unacked_packets = 305 rdsv3_sysctl_max_unacked_packets; 306 conn->c_unacked_bytes = 307 rdsv3_sysctl_max_unacked_bytes; 308 rdsv3_stats_inc(s_send_ack_required); 309 } else { 310 conn->c_unacked_bytes -= len; 311 conn->c_unacked_packets--; 312 } 313 314 conn->c_xmit_rm = rm; 315 } 316 317 /* 318 * Try and send an rdma message. Let's see if we can 319 * keep this simple and require that the transport either 320 * send the whole rdma or none of it. 321 */ 322 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { 323 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); 324 if (ret) 325 break; 326 conn->c_xmit_rdma_sent = 1; 327 /* 328 * The transport owns the mapped memory for now. 329 * You can't unmap it while it's on the send queue 330 */ 331 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags); 332 } 333 334 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) || 335 conn->c_xmit_sg < rm->m_nents) { 336 ret = conn->c_trans->xmit(conn, rm, 337 conn->c_xmit_hdr_off, 338 conn->c_xmit_sg, 339 conn->c_xmit_data_off); 340 if (ret <= 0) 341 break; 342 343 if (conn->c_xmit_hdr_off < 344 sizeof (struct rdsv3_header)) { 345 tmp = min(ret, 346 sizeof (struct rdsv3_header) - 347 conn->c_xmit_hdr_off); 348 conn->c_xmit_hdr_off += tmp; 349 ret -= tmp; 350 } 351 352 sg = &rm->m_sg[conn->c_xmit_sg]; 353 while (ret) { 354 tmp = min(ret, rdsv3_sg_len(sg) - 355 conn->c_xmit_data_off); 356 conn->c_xmit_data_off += tmp; 357 ret -= tmp; 358 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) { 359 conn->c_xmit_data_off = 0; 360 sg++; 361 conn->c_xmit_sg++; 362 ASSERT(!(ret != 0 && 363 conn->c_xmit_sg == rm->m_nents)); 364 } 365 } 366 } 367 } 368 369 /* Nuke any messages we decided not to retransmit. */ 370 if (!list_is_empty(&to_be_dropped)) 371 rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 372 373 if (conn->c_trans->xmit_complete) 374 conn->c_trans->xmit_complete(conn); 375 376 /* 377 * We might be racing with another sender who queued a message but 378 * backed off on noticing that we held the c_send_lock. If we check 379 * for queued messages after dropping the sem then either we'll 380 * see the queued message or the queuer will get the sem. If we 381 * notice the queued message then we trigger an immediate retry. 382 * 383 * We need to be careful only to do this when we stopped processing 384 * the send queue because it was empty. It's the only way we 385 * stop processing the loop when the transport hasn't taken 386 * responsibility for forward progress. 387 */ 388 mutex_exit(&conn->c_send_lock); 389 390 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 391 /* 392 * We exhausted the send quota, but there's work left to 393 * do. Return and (re-)schedule the send worker. 394 */ 395 ret = -EAGAIN; 396 } 397 398 atomic_dec_32(&conn->c_senders); 399 400 if (ret == 0 && was_empty) { 401 /* 402 * A simple bit test would be way faster than taking the 403 * spin lock 404 */ 405 mutex_enter(&conn->c_lock); 406 if (!list_is_empty(&conn->c_send_queue)) { 407 rdsv3_stats_inc(s_send_sem_queue_raced); 408 ret = -EAGAIN; 409 } 410 mutex_exit(&conn->c_lock); 411 } 412 413 out: 414 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)", 415 conn, ret); 416 return (ret); 417 } 418 419 static void 420 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm) 421 { 422 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len); 423 424 ASSERT(mutex_owned(&rs->rs_lock)); 425 426 ASSERT(rs->rs_snd_bytes >= len); 427 rs->rs_snd_bytes -= len; 428 429 if (rs->rs_snd_bytes == 0) 430 rdsv3_stats_inc(s_send_queue_empty); 431 } 432 433 static inline int 434 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack, 435 is_acked_func is_acked) 436 { 437 if (is_acked) 438 return (is_acked(rm, ack)); 439 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack); 440 } 441 442 /* 443 * Returns true if there are no messages on the send and retransmit queues 444 * which have a sequence number greater than or equal to the given sequence 445 * number. 446 */ 447 int 448 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq) 449 { 450 struct rdsv3_message *rm; 451 int ret = 1; 452 453 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn); 454 455 mutex_enter(&conn->c_lock); 456 457 /* XXX - original code spits out warning */ 458 rm = list_head(&conn->c_retrans); 459 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 460 ret = 0; 461 462 /* XXX - original code spits out warning */ 463 rm = list_head(&conn->c_send_queue); 464 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 465 ret = 0; 466 467 mutex_exit(&conn->c_lock); 468 469 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn); 470 471 return (ret); 472 } 473 474 /* 475 * This is pretty similar to what happens below in the ACK 476 * handling code - except that we call here as soon as we get 477 * the IB send completion on the RDMA op and the accompanying 478 * message. 479 */ 480 void 481 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status) 482 { 483 struct rdsv3_sock *rs = NULL; 484 struct rdsv3_rdma_op *ro; 485 struct rdsv3_notifier *notifier; 486 487 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm); 488 489 mutex_enter(&rm->m_rs_lock); 490 491 ro = rm->m_rdma_op; 492 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) && 493 ro && ro->r_notify && ro->r_notifier) { 494 notifier = ro->r_notifier; 495 rs = rm->m_rs; 496 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 497 498 notifier->n_status = status; 499 mutex_enter(&rs->rs_lock); 500 list_insert_tail(&rs->rs_notify_queue, notifier); 501 mutex_exit(&rs->rs_lock); 502 ro->r_notifier = NULL; 503 } 504 505 mutex_exit(&rm->m_rs_lock); 506 507 if (rs) { 508 struct rsock *sk = rdsv3_rs_to_sk(rs); 509 int error; 510 511 rdsv3_wake_sk_sleep(rs); 512 513 /* wake up anyone waiting in poll */ 514 sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL, 515 0, 0, &error, NULL); 516 if (error != 0) { 517 RDSV3_DPRINTF2("rdsv3_recv_incoming", 518 "su_recv returned: %d", error); 519 } 520 521 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 522 } 523 524 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm); 525 } 526 527 /* 528 * This is the same as rdsv3_rdma_send_complete except we 529 * don't do any locking - we have all the ingredients (message, 530 * socket, socket lock) and can just move the notifier. 531 */ 532 static inline void 533 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm, 534 int status) 535 { 536 struct rdsv3_rdma_op *ro; 537 void *ic; 538 539 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete", 540 "Enter(rs: %p, rm: %p)", rs, rm); 541 542 ro = rm->m_rdma_op; 543 if (ro && ro->r_notify && ro->r_notifier) { 544 ro->r_notifier->n_status = status; 545 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier); 546 ro->r_notifier = NULL; 547 } 548 549 /* No need to wake the app - caller does this */ 550 } 551 552 /* 553 * This is called from the IB send completion when we detect 554 * a RDMA operation that failed with remote access error. 555 * So speed is not an issue here. 556 */ 557 struct rdsv3_message * 558 rdsv3_send_get_message(struct rdsv3_connection *conn, 559 struct rdsv3_rdma_op *op) 560 { 561 struct rdsv3_message *rm, *tmp, *found = NULL; 562 563 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn); 564 565 mutex_enter(&conn->c_lock); 566 567 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 568 if (rm->m_rdma_op == op) { 569 atomic_inc_32(&rm->m_refcount); 570 found = rm; 571 goto out; 572 } 573 } 574 575 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue, 576 m_conn_item) { 577 if (rm->m_rdma_op == op) { 578 atomic_inc_32(&rm->m_refcount); 579 found = rm; 580 break; 581 } 582 } 583 584 out: 585 mutex_exit(&conn->c_lock); 586 587 return (found); 588 } 589 590 /* 591 * This removes messages from the socket's list if they're on it. The list 592 * argument must be private to the caller, we must be able to modify it 593 * without locks. The messages must have a reference held for their 594 * position on the list. This function will drop that reference after 595 * removing the messages from the 'messages' list regardless of if it found 596 * the messages on the socket list or not. 597 */ 598 void 599 rdsv3_send_remove_from_sock(struct list *messages, int status) 600 { 601 struct rdsv3_sock *rs = NULL; 602 struct rdsv3_message *rm; 603 604 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter"); 605 606 while (!list_is_empty(messages)) { 607 int was_on_sock = 0; 608 rm = list_remove_head(messages); 609 610 /* 611 * If we see this flag cleared then we're *sure* that someone 612 * else beat us to removing it from the sock. If we race 613 * with their flag update we'll get the lock and then really 614 * see that the flag has been cleared. 615 * 616 * The message spinlock makes sure nobody clears rm->m_rs 617 * while we're messing with it. It does not prevent the 618 * message from being removed from the socket, though. 619 */ 620 mutex_enter(&rm->m_rs_lock); 621 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) 622 goto unlock_and_drop; 623 624 if (rs != rm->m_rs) { 625 if (rs) { 626 rdsv3_wake_sk_sleep(rs); 627 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 628 } 629 rs = rm->m_rs; 630 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 631 } 632 633 mutex_enter(&rs->rs_lock); 634 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) { 635 struct rdsv3_rdma_op *ro = rm->m_rdma_op; 636 struct rdsv3_notifier *notifier; 637 638 list_remove_node(&rm->m_sock_item); 639 rdsv3_send_sndbuf_remove(rs, rm); 640 if (ro && ro->r_notifier && 641 (status || ro->r_notify)) { 642 notifier = ro->r_notifier; 643 list_insert_tail(&rs->rs_notify_queue, 644 notifier); 645 if (!notifier->n_status) 646 notifier->n_status = status; 647 rm->m_rdma_op->r_notifier = NULL; 648 } 649 was_on_sock = 1; 650 rm->m_rs = NULL; 651 } 652 mutex_exit(&rs->rs_lock); 653 654 unlock_and_drop: 655 mutex_exit(&rm->m_rs_lock); 656 rdsv3_message_put(rm); 657 if (was_on_sock) 658 rdsv3_message_put(rm); 659 } 660 661 if (rs) { 662 rdsv3_wake_sk_sleep(rs); 663 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 664 } 665 666 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return"); 667 } 668 669 /* 670 * Transports call here when they've determined that the receiver queued 671 * messages up to, and including, the given sequence number. Messages are 672 * moved to the retrans queue when rdsv3_send_xmit picks them off the send 673 * queue. This means that in the TCP case, the message may not have been 674 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 675 * checks the RDSV3_MSG_HAS_ACK_SEQ bit. 676 * 677 * XXX It's not clear to me how this is safely serialized with socket 678 * destruction. Maybe it should bail if it sees SOCK_DEAD. 679 */ 680 void 681 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack, 682 is_acked_func is_acked) 683 { 684 struct rdsv3_message *rm, *tmp; 685 list_t list; 686 687 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn); 688 689 list_create(&list, sizeof (struct rdsv3_message), 690 offsetof(struct rdsv3_message, m_conn_item)); 691 692 mutex_enter(&conn->c_lock); 693 694 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 695 if (!rdsv3_send_is_acked(rm, ack, is_acked)) 696 break; 697 698 list_remove_node(&rm->m_conn_item); 699 list_insert_tail(&list, rm); 700 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 701 } 702 703 #if 0 704 XXX 705 /* order flag updates with spin locks */ 706 if (!list_is_empty(&list)) 707 smp_mb__after_clear_bit(); 708 #endif 709 710 mutex_exit(&conn->c_lock); 711 712 /* now remove the messages from the sock list as needed */ 713 rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 714 715 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn); 716 } 717 718 void 719 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest) 720 { 721 struct rdsv3_message *rm, *tmp; 722 struct rdsv3_connection *conn; 723 list_t list; 724 int wake = 0; 725 726 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs); 727 728 list_create(&list, sizeof (struct rdsv3_message), 729 offsetof(struct rdsv3_message, m_sock_item)); 730 731 /* get all the messages we're dropping under the rs lock */ 732 mutex_enter(&rs->rs_lock); 733 734 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue, 735 m_sock_item) { 736 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 737 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 738 continue; 739 wake = 1; 740 list_remove(&rs->rs_send_queue, rm); 741 list_insert_tail(&list, rm); 742 rdsv3_send_sndbuf_remove(rs, rm); 743 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 744 } 745 746 mutex_exit(&rs->rs_lock); 747 748 conn = NULL; 749 750 /* now remove the messages from the conn list as needed */ 751 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) { 752 /* 753 * We do this here rather than in the loop above, so that 754 * we don't have to nest m_rs_lock under rs->rs_lock 755 */ 756 mutex_enter(&rm->m_rs_lock); 757 /* If this is a RDMA operation, notify the app. */ 758 __rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); 759 rm->m_rs = NULL; 760 mutex_exit(&rm->m_rs_lock); 761 762 /* 763 * If we see this flag cleared then we're *sure* that someone 764 * else beat us to removing it from the conn. If we race 765 * with their flag update we'll get the lock and then really 766 * see that the flag has been cleared. 767 */ 768 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) 769 continue; 770 771 if (conn != rm->m_inc.i_conn) { 772 if (conn) 773 mutex_exit(&conn->c_lock); 774 conn = rm->m_inc.i_conn; 775 mutex_enter(&conn->c_lock); 776 } 777 778 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) { 779 list_remove_node(&rm->m_conn_item); 780 rdsv3_message_put(rm); 781 } 782 } 783 784 if (conn) 785 mutex_exit(&conn->c_lock); 786 787 if (wake) 788 rdsv3_wake_sk_sleep(rs); 789 790 while (!list_is_empty(&list)) { 791 rm = list_remove_head(&list); 792 793 rdsv3_message_wait(rm); 794 rdsv3_message_put(rm); 795 } 796 797 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs); 798 } 799 800 /* 801 * we only want this to fire once so we use the callers 'queued'. It's 802 * possible that another thread can race with us and remove the 803 * message from the flow with RDSV3_CANCEL_SENT_TO. 804 */ 805 static int 806 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn, 807 struct rdsv3_message *rm, uint16_be_t sport, 808 uint16_be_t dport, int *queued) 809 { 810 uint32_t len; 811 812 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm); 813 814 if (*queued) 815 goto out; 816 817 len = ntohl(rm->m_inc.i_hdr.h_len); 818 819 /* 820 * this is the only place which holds both the socket's rs_lock 821 * and the connection's c_lock 822 */ 823 mutex_enter(&rs->rs_lock); 824 825 /* 826 * If there is a little space in sndbuf, we don't queue anything, 827 * and userspace gets -EAGAIN. But poll() indicates there's send 828 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 829 * freed up by incoming acks. So we check the *old* value of 830 * rs_snd_bytes here to allow the last msg to exceed the buffer, 831 * and poll() now knows no more data can be sent. 832 */ 833 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) { 834 rs->rs_snd_bytes += len; 835 836 /* 837 * let recv side know we are close to send space exhaustion. 838 * This is probably not the optimal way to do it, as this 839 * means we set the flag on *all* messages as soon as our 840 * throughput hits a certain threshold. 841 */ 842 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2) 843 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 844 845 list_insert_tail(&rs->rs_send_queue, rm); 846 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 847 848 rdsv3_message_addref(rm); 849 rm->m_rs = rs; 850 851 /* 852 * The code ordering is a little weird, but we're 853 * trying to minimize the time we hold c_lock 854 */ 855 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport, 856 dport, 0); 857 rm->m_inc.i_conn = conn; 858 rdsv3_message_addref(rm); /* XXX - called twice */ 859 860 mutex_enter(&conn->c_lock); 861 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++); 862 list_insert_tail(&conn->c_send_queue, rm); 863 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 864 mutex_exit(&conn->c_lock); 865 866 RDSV3_DPRINTF5("rdsv3_send_queue_rm", 867 "queued msg %p len %d, rs %p bytes %d seq %llu", 868 rm, len, rs, rs->rs_snd_bytes, 869 (unsigned long long)ntohll( 870 rm->m_inc.i_hdr.h_sequence)); 871 872 *queued = 1; 873 } 874 875 mutex_exit(&rs->rs_lock); 876 877 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs); 878 out: 879 return (*queued); 880 } 881 882 static int 883 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm, 884 struct msghdr *msg, int *allocated_mr) 885 { 886 struct cmsghdr *cmsg; 887 int ret = 0; 888 889 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs); 890 891 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 892 893 if (cmsg->cmsg_level != SOL_RDS) 894 continue; 895 896 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d", 897 cmsg, rm, cmsg->cmsg_type); 898 /* 899 * As a side effect, RDMA_DEST and RDMA_MAP will set 900 * rm->m_rdma_cookie and rm->m_rdma_mr. 901 */ 902 switch (cmsg->cmsg_type) { 903 case RDS_CMSG_RDMA_ARGS: 904 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg); 905 break; 906 907 case RDS_CMSG_RDMA_DEST: 908 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg); 909 break; 910 911 case RDS_CMSG_RDMA_MAP: 912 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg); 913 if (ret) 914 *allocated_mr = 1; 915 break; 916 917 default: 918 return (-EINVAL); 919 } 920 921 if (ret) 922 break; 923 } 924 925 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs); 926 927 return (ret); 928 } 929 930 extern unsigned long rdsv3_max_bcopy_size; 931 932 int 933 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg, 934 size_t payload_len) 935 { 936 struct rsock *sk = rdsv3_rs_to_sk(rs); 937 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 938 uint32_be_t daddr; 939 uint16_be_t dport; 940 struct rdsv3_message *rm = NULL; 941 struct rdsv3_connection *conn; 942 int ret = 0; 943 int queued = 0, allocated_mr = 0; 944 int nonblock = msg->msg_flags & MSG_DONTWAIT; 945 long timeo = rdsv3_sndtimeo(sk, nonblock); 946 947 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs); 948 949 if (msg->msg_namelen) { 950 /* XXX fail non-unicast destination IPs? */ 951 if (msg->msg_namelen < sizeof (*usin) || 952 usin->sin_family != AF_INET_OFFLOAD) { 953 ret = -EINVAL; 954 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 955 goto out; 956 } 957 daddr = usin->sin_addr.s_addr; 958 dport = usin->sin_port; 959 } else { 960 /* We only care about consistency with ->connect() */ 961 mutex_enter(&sk->sk_lock); 962 daddr = rs->rs_conn_addr; 963 dport = rs->rs_conn_port; 964 mutex_exit(&sk->sk_lock); 965 } 966 967 /* racing with another thread binding seems ok here */ 968 if (daddr == 0 || rs->rs_bound_addr == 0) { 969 ret = -ENOTCONN; /* XXX not a great errno */ 970 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 971 goto out; 972 } 973 974 if (payload_len > rdsv3_max_bcopy_size) { 975 RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d", 976 payload_len); 977 ret = -EMSGSIZE; 978 goto out; 979 } 980 981 rm = rdsv3_message_copy_from_user(uio, payload_len); 982 if (IS_ERR(rm)) { 983 ret = PTR_ERR(rm); 984 RDSV3_DPRINTF2("rdsv3_sendmsg", 985 "rdsv3_message_copy_from_user failed %d", -ret); 986 rm = NULL; 987 goto out; 988 } 989 990 rm->m_daddr = daddr; 991 992 /* Parse any control messages the user may have included. */ 993 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr); 994 if (ret) { 995 RDSV3_DPRINTF2("rdsv3_sendmsg", 996 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d", 997 rs, rm, msg, ret); 998 goto out; 999 } 1000 1001 /* 1002 * rdsv3_conn_create has a spinlock that runs with IRQ off. 1003 * Caching the conn in the socket helps a lot. 1004 */ 1005 mutex_enter(&rs->rs_conn_lock); 1006 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) { 1007 conn = rs->rs_conn; 1008 } else { 1009 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr, 1010 daddr, rs->rs_transport, KM_NOSLEEP); 1011 if (IS_ERR(conn)) { 1012 mutex_exit(&rs->rs_conn_lock); 1013 ret = PTR_ERR(conn); 1014 RDSV3_DPRINTF2("rdsv3_sendmsg", 1015 "rdsv3_conn_create_outgoing failed %d", 1016 -ret); 1017 goto out; 1018 } 1019 rs->rs_conn = conn; 1020 } 1021 mutex_exit(&rs->rs_conn_lock); 1022 1023 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1024 conn->c_trans->xmit_rdma == NULL) { 1025 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p", 1026 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1027 ret = -EOPNOTSUPP; 1028 goto out; 1029 } 1030 1031 /* 1032 * If the connection is down, trigger a connect. We may 1033 * have scheduled a delayed reconnect however - in this case 1034 * we should not interfere. 1035 */ 1036 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1037 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1038 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1039 1040 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs); 1041 if (ret) { 1042 mutex_enter(&rs->rs_congested_lock); 1043 rs->rs_seen_congestion = 1; 1044 cv_signal(&rs->rs_congested_cv); 1045 mutex_exit(&rs->rs_congested_lock); 1046 1047 RDSV3_DPRINTF2("rdsv3_sendmsg", 1048 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret); 1049 goto out; 1050 } 1051 1052 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, 1053 &queued); 1054 if (!queued) { 1055 /* rdsv3_stats_inc(s_send_queue_full); */ 1056 /* XXX make sure this is reasonable */ 1057 if (payload_len > rdsv3_sk_sndbuf(rs)) { 1058 ret = -EMSGSIZE; 1059 RDSV3_DPRINTF2("rdsv3_sendmsg", 1060 "msgsize(%d) too big, returning: %d", 1061 payload_len, -ret); 1062 goto out; 1063 } 1064 if (nonblock) { 1065 ret = -EAGAIN; 1066 RDSV3_DPRINTF3("rdsv3_sendmsg", 1067 "send queue full (%d), returning: %d", 1068 payload_len, -ret); 1069 goto out; 1070 } 1071 1072 #if 0 1073 ret = rdsv3_wait_sig(sk->sk_sleep, 1074 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1075 dport, &queued))); 1076 if (ret == 0) { 1077 /* signal/timeout pending */ 1078 RDSV3_DPRINTF2("rdsv3_sendmsg", 1079 "woke due to signal: %d", ret); 1080 ret = -ERESTART; 1081 goto out; 1082 } 1083 #else 1084 mutex_enter(&sk->sk_sleep->waitq_mutex); 1085 sk->sk_sleep->waitq_waiters++; 1086 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1087 dport, &queued)) { 1088 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 1089 &sk->sk_sleep->waitq_mutex); 1090 if (ret == 0) { 1091 /* signal/timeout pending */ 1092 RDSV3_DPRINTF2("rdsv3_sendmsg", 1093 "woke due to signal: %d", ret); 1094 ret = -EINTR; 1095 sk->sk_sleep->waitq_waiters--; 1096 mutex_exit(&sk->sk_sleep->waitq_mutex); 1097 goto out; 1098 } 1099 } 1100 sk->sk_sleep->waitq_waiters--; 1101 mutex_exit(&sk->sk_sleep->waitq_mutex); 1102 #endif 1103 1104 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d", 1105 queued); 1106 1107 ASSERT(queued); 1108 ret = 0; 1109 } 1110 1111 /* 1112 * By now we've committed to the send. We reuse rdsv3_send_worker() 1113 * to retry sends in the rds thread if the transport asks us to. 1114 */ 1115 rdsv3_stats_inc(s_send_queued); 1116 1117 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1118 (void) rdsv3_send_worker(&conn->c_send_w.work); 1119 1120 rdsv3_message_put(rm); 1121 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)", 1122 rs, payload_len); 1123 return (payload_len); 1124 1125 out: 1126 /* 1127 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1128 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1129 * or in any other way, we need to destroy the MR again 1130 */ 1131 if (allocated_mr) 1132 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 1133 1); 1134 1135 if (rm) 1136 rdsv3_message_put(rm); 1137 return (ret); 1138 } 1139 1140 /* 1141 * Reply to a ping packet. 1142 */ 1143 int 1144 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport) 1145 { 1146 struct rdsv3_message *rm; 1147 int ret = 0; 1148 1149 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn); 1150 1151 rm = rdsv3_message_alloc(0, KM_NOSLEEP); 1152 if (!rm) { 1153 ret = -ENOMEM; 1154 goto out; 1155 } 1156 1157 rm->m_daddr = conn->c_faddr; 1158 1159 /* 1160 * If the connection is down, trigger a connect. We may 1161 * have scheduled a delayed reconnect however - in this case 1162 * we should not interfere. 1163 */ 1164 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1165 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1166 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1167 1168 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL); 1169 if (ret) 1170 goto out; 1171 1172 mutex_enter(&conn->c_lock); 1173 list_insert_tail(&conn->c_send_queue, rm); 1174 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 1175 rdsv3_message_addref(rm); 1176 rm->m_inc.i_conn = conn; 1177 1178 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1179 conn->c_next_tx_seq); 1180 conn->c_next_tx_seq++; 1181 mutex_exit(&conn->c_lock); 1182 1183 rdsv3_stats_inc(s_send_queued); 1184 rdsv3_stats_inc(s_send_pong); 1185 1186 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1187 (void) rdsv3_send_xmit(conn); 1188 1189 rdsv3_message_put(rm); 1190 1191 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn); 1192 return (0); 1193 1194 out: 1195 if (rm) 1196 rdsv3_message_put(rm); 1197 return (ret); 1198 } 1199