1 /* 2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 3 */ 4 5 /* 6 * This file contains code imported from the OFED rds source file send.c 7 * Oracle elects to have and use the contents of send.c under and governed 8 * by the OpenIB.org BSD license (see below for full license text). However, 9 * the following notice accompanied the original version of this file: 10 */ 11 12 /* 13 * Copyright (c) 2006 Oracle. All rights reserved. 14 * 15 * This software is available to you under a choice of one of two 16 * licenses. You may choose to be licensed under the terms of the GNU 17 * General Public License (GPL) Version 2, available from the file 18 * COPYING in the main directory of this source tree, or the 19 * OpenIB.org BSD license below: 20 * 21 * Redistribution and use in source and binary forms, with or 22 * without modification, are permitted provided that the following 23 * conditions are met: 24 * 25 * - Redistributions of source code must retain the above 26 * copyright notice, this list of conditions and the following 27 * disclaimer. 28 * 29 * - Redistributions in binary form must reproduce the above 30 * copyright notice, this list of conditions and the following 31 * disclaimer in the documentation and/or other materials 32 * provided with the distribution. 33 * 34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 35 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 36 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 37 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 38 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 39 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 40 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 41 * SOFTWARE. 42 * 43 */ 44 #include <sys/stropts.h> 45 #include <sys/systm.h> 46 47 #include <sys/rds.h> 48 #include <sys/socket.h> 49 #include <sys/socketvar.h> 50 51 #include <sys/ib/clients/rdsv3/rdsv3.h> 52 #include <sys/ib/clients/rdsv3/rdma.h> 53 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 54 55 /* 56 * When transmitting messages in rdsv3_send_xmit, we need to emerge from 57 * time to time and briefly release the CPU. Otherwise the softlock watchdog 58 * will kick our shin. 59 * Also, it seems fairer to not let one busy connection stall all the 60 * others. 61 * 62 * send_batch_count is the number of times we'll loop in send_xmit. Setting 63 * it to 0 will restore the old behavior (where we looped until we had 64 * drained the queue). 65 */ 66 static int send_batch_count = 64; 67 68 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op); 69 /* 70 * Reset the send state. Caller must hold c_send_lock when calling here. 71 */ 72 void 73 rdsv3_send_reset(struct rdsv3_connection *conn) 74 { 75 struct rdsv3_message *rm, *tmp; 76 struct rdsv3_rdma_op *ro; 77 78 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn); 79 80 ASSERT(MUTEX_HELD(&conn->c_send_lock)); 81 82 if (conn->c_xmit_rm) { 83 rm = conn->c_xmit_rm; 84 ro = rm->m_rdma_op; 85 if (ro && ro->r_mapped) { 86 RDSV3_DPRINTF2("rdsv3_send_reset", 87 "rm %p mflg 0x%x map %d mihdl %p sgl %p", 88 rm, rm->m_flags, ro->r_mapped, 89 ro->r_rdma_sg[0].mihdl, 90 ro->r_rdma_sg[0].swr.wr_sgl); 91 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro); 92 } 93 /* 94 * Tell the user the RDMA op is no longer mapped by the 95 * transport. This isn't entirely true (it's flushed out 96 * independently) but as the connection is down, there's 97 * no ongoing RDMA to/from that memory 98 */ 99 rdsv3_message_unmapped(conn->c_xmit_rm); 100 rdsv3_message_put(conn->c_xmit_rm); 101 conn->c_xmit_rm = NULL; 102 } 103 104 conn->c_xmit_sg = 0; 105 conn->c_xmit_hdr_off = 0; 106 conn->c_xmit_data_off = 0; 107 conn->c_xmit_rdma_sent = 0; 108 conn->c_map_queued = 0; 109 110 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets; 111 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes; 112 113 /* Mark messages as retransmissions, and move them to the send q */ 114 mutex_enter(&conn->c_lock); 115 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 116 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 117 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags); 118 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) { 119 RDSV3_DPRINTF4("_send_reset", 120 "RT rm %p mflg 0x%x sgl %p", 121 rm, rm->m_flags, 122 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl); 123 } 124 } 125 list_move_tail(&conn->c_send_queue, &conn->c_retrans); 126 mutex_exit(&conn->c_lock); 127 128 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn); 129 } 130 131 /* 132 * We're making the concious trade-off here to only send one message 133 * down the connection at a time. 134 * Pro: 135 * - tx queueing is a simple fifo list 136 * - reassembly is optional and easily done by transports per conn 137 * - no per flow rx lookup at all, straight to the socket 138 * - less per-frag memory and wire overhead 139 * Con: 140 * - queued acks can be delayed behind large messages 141 * Depends: 142 * - small message latency is higher behind queued large messages 143 * - large message latency isn't starved by intervening small sends 144 */ 145 int 146 rdsv3_send_xmit(struct rdsv3_connection *conn) 147 { 148 struct rdsv3_message *rm; 149 unsigned int tmp; 150 unsigned int send_quota = send_batch_count; 151 struct rdsv3_scatterlist *sg; 152 int ret = 0; 153 int was_empty = 0; 154 list_t to_be_dropped; 155 156 restart: 157 if (!rdsv3_conn_up(conn)) 158 goto out; 159 160 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn); 161 162 list_create(&to_be_dropped, sizeof (struct rdsv3_message), 163 offsetof(struct rdsv3_message, m_conn_item)); 164 165 /* 166 * sendmsg calls here after having queued its message on the send 167 * queue. We only have one task feeding the connection at a time. If 168 * another thread is already feeding the queue then we back off. This 169 * avoids blocking the caller and trading per-connection data between 170 * caches per message. 171 */ 172 if (!mutex_tryenter(&conn->c_send_lock)) { 173 RDSV3_DPRINTF4("rdsv3_send_xmit", 174 "Another thread running(conn: %p)", conn); 175 rdsv3_stats_inc(s_send_sem_contention); 176 ret = -ENOMEM; 177 goto out; 178 } 179 atomic_add_32(&conn->c_senders, 1); 180 181 if (conn->c_trans->xmit_prepare) 182 conn->c_trans->xmit_prepare(conn); 183 184 /* 185 * spin trying to push headers and data down the connection until 186 * the connection doesn't make forward progress. 187 */ 188 while (--send_quota) { 189 /* 190 * See if need to send a congestion map update if we're 191 * between sending messages. The send_sem protects our sole 192 * use of c_map_offset and _bytes. 193 * Note this is used only by transports that define a special 194 * xmit_cong_map function. For all others, we create allocate 195 * a cong_map message and treat it just like any other send. 196 */ 197 if (conn->c_map_bytes) { 198 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 199 conn->c_map_offset); 200 if (ret <= 0) 201 break; 202 203 conn->c_map_offset += ret; 204 conn->c_map_bytes -= ret; 205 if (conn->c_map_bytes) 206 continue; 207 } 208 209 /* 210 * If we're done sending the current message, clear the 211 * offset and S/G temporaries. 212 */ 213 rm = conn->c_xmit_rm; 214 if (rm != NULL && 215 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) && 216 conn->c_xmit_sg == rm->m_nents) { 217 conn->c_xmit_rm = NULL; 218 conn->c_xmit_sg = 0; 219 conn->c_xmit_hdr_off = 0; 220 conn->c_xmit_data_off = 0; 221 conn->c_xmit_rdma_sent = 0; 222 223 /* Release the reference to the previous message. */ 224 rdsv3_message_put(rm); 225 rm = NULL; 226 } 227 228 /* If we're asked to send a cong map update, do so. */ 229 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 230 if (conn->c_trans->xmit_cong_map != NULL) { 231 conn->c_map_offset = 0; 232 conn->c_map_bytes = 233 sizeof (struct rdsv3_header) + 234 RDSV3_CONG_MAP_BYTES; 235 continue; 236 } 237 238 rm = rdsv3_cong_update_alloc(conn); 239 if (IS_ERR(rm)) { 240 ret = PTR_ERR(rm); 241 break; 242 } 243 244 conn->c_xmit_rm = rm; 245 } 246 247 /* 248 * Grab the next message from the send queue, if there is one. 249 * 250 * c_xmit_rm holds a ref while we're sending this message down 251 * the connction. We can use this ref while holding the 252 * send_sem.. rdsv3_send_reset() is serialized with it. 253 */ 254 if (rm == NULL) { 255 unsigned int len; 256 257 mutex_enter(&conn->c_lock); 258 259 if (!list_is_empty(&conn->c_send_queue)) { 260 rm = list_remove_head(&conn->c_send_queue); 261 rdsv3_message_addref(rm); 262 263 /* 264 * Move the message from the send queue to 265 * the retransmit 266 * list right away. 267 */ 268 list_insert_tail(&conn->c_retrans, rm); 269 } 270 271 mutex_exit(&conn->c_lock); 272 273 if (rm == NULL) { 274 was_empty = 1; 275 break; 276 } 277 278 /* 279 * Unfortunately, the way Infiniband deals with 280 * RDMA to a bad MR key is by moving the entire 281 * queue pair to error state. We cold possibly 282 * recover from that, but right now we drop the 283 * connection. 284 * Therefore, we never retransmit messages with 285 * RDMA ops. 286 */ 287 if (rm->m_rdma_op && 288 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) { 289 mutex_enter(&conn->c_lock); 290 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, 291 &rm->m_flags)) 292 list_remove_node(&rm->m_conn_item); 293 list_insert_tail(&to_be_dropped, rm); 294 mutex_exit(&conn->c_lock); 295 rdsv3_message_put(rm); 296 continue; 297 } 298 299 /* Require an ACK every once in a while */ 300 len = ntohl(rm->m_inc.i_hdr.h_len); 301 if (conn->c_unacked_packets == 0 || 302 conn->c_unacked_bytes < len) { 303 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 304 305 conn->c_unacked_packets = 306 rdsv3_sysctl_max_unacked_packets; 307 conn->c_unacked_bytes = 308 rdsv3_sysctl_max_unacked_bytes; 309 rdsv3_stats_inc(s_send_ack_required); 310 } else { 311 conn->c_unacked_bytes -= len; 312 conn->c_unacked_packets--; 313 } 314 315 conn->c_xmit_rm = rm; 316 } 317 318 /* 319 * Try and send an rdma message. Let's see if we can 320 * keep this simple and require that the transport either 321 * send the whole rdma or none of it. 322 */ 323 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { 324 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); 325 if (ret) 326 break; 327 conn->c_xmit_rdma_sent = 1; 328 /* 329 * The transport owns the mapped memory for now. 330 * You can't unmap it while it's on the send queue 331 */ 332 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags); 333 } 334 335 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) || 336 conn->c_xmit_sg < rm->m_nents) { 337 ret = conn->c_trans->xmit(conn, rm, 338 conn->c_xmit_hdr_off, 339 conn->c_xmit_sg, 340 conn->c_xmit_data_off); 341 if (ret <= 0) 342 break; 343 344 if (conn->c_xmit_hdr_off < 345 sizeof (struct rdsv3_header)) { 346 tmp = min(ret, 347 sizeof (struct rdsv3_header) - 348 conn->c_xmit_hdr_off); 349 conn->c_xmit_hdr_off += tmp; 350 ret -= tmp; 351 } 352 353 sg = &rm->m_sg[conn->c_xmit_sg]; 354 while (ret) { 355 tmp = min(ret, rdsv3_sg_len(sg) - 356 conn->c_xmit_data_off); 357 conn->c_xmit_data_off += tmp; 358 ret -= tmp; 359 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) { 360 conn->c_xmit_data_off = 0; 361 sg++; 362 conn->c_xmit_sg++; 363 ASSERT(!(ret != 0 && 364 conn->c_xmit_sg == rm->m_nents)); 365 } 366 } 367 } 368 } 369 370 /* Nuke any messages we decided not to retransmit. */ 371 if (!list_is_empty(&to_be_dropped)) 372 rdsv3_send_remove_from_sock(&to_be_dropped, RDSV3_RDMA_DROPPED); 373 374 if (conn->c_trans->xmit_complete) 375 conn->c_trans->xmit_complete(conn); 376 377 /* 378 * We might be racing with another sender who queued a message but 379 * backed off on noticing that we held the c_send_lock. If we check 380 * for queued messages after dropping the sem then either we'll 381 * see the queued message or the queuer will get the sem. If we 382 * notice the queued message then we trigger an immediate retry. 383 * 384 * We need to be careful only to do this when we stopped processing 385 * the send queue because it was empty. It's the only way we 386 * stop processing the loop when the transport hasn't taken 387 * responsibility for forward progress. 388 */ 389 mutex_exit(&conn->c_send_lock); 390 391 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 392 /* 393 * We exhausted the send quota, but there's work left to 394 * do. Return and (re-)schedule the send worker. 395 */ 396 ret = -EAGAIN; 397 } 398 399 atomic_dec_32(&conn->c_senders); 400 401 if (ret == 0 && was_empty) { 402 /* 403 * A simple bit test would be way faster than taking the 404 * spin lock 405 */ 406 mutex_enter(&conn->c_lock); 407 if (!list_is_empty(&conn->c_send_queue)) { 408 rdsv3_stats_inc(s_send_sem_queue_raced); 409 ret = -EAGAIN; 410 } 411 mutex_exit(&conn->c_lock); 412 } 413 414 out: 415 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)", 416 conn, ret); 417 return (ret); 418 } 419 420 static void 421 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm) 422 { 423 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len); 424 425 ASSERT(mutex_owned(&rs->rs_lock)); 426 427 ASSERT(rs->rs_snd_bytes >= len); 428 rs->rs_snd_bytes -= len; 429 430 if (rs->rs_snd_bytes == 0) 431 rdsv3_stats_inc(s_send_queue_empty); 432 } 433 434 static inline int 435 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack, 436 is_acked_func is_acked) 437 { 438 if (is_acked) 439 return (is_acked(rm, ack)); 440 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack); 441 } 442 443 /* 444 * Returns true if there are no messages on the send and retransmit queues 445 * which have a sequence number greater than or equal to the given sequence 446 * number. 447 */ 448 int 449 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq) 450 { 451 struct rdsv3_message *rm; 452 int ret = 1; 453 454 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn); 455 456 mutex_enter(&conn->c_lock); 457 458 /* XXX - original code spits out warning */ 459 rm = list_head(&conn->c_retrans); 460 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 461 ret = 0; 462 463 /* XXX - original code spits out warning */ 464 rm = list_head(&conn->c_send_queue); 465 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 466 ret = 0; 467 468 mutex_exit(&conn->c_lock); 469 470 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn); 471 472 return (ret); 473 } 474 475 /* 476 * This is pretty similar to what happens below in the ACK 477 * handling code - except that we call here as soon as we get 478 * the IB send completion on the RDMA op and the accompanying 479 * message. 480 */ 481 void 482 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status) 483 { 484 struct rdsv3_sock *rs = NULL; 485 struct rdsv3_rdma_op *ro; 486 struct rdsv3_notifier *notifier; 487 488 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm); 489 490 mutex_enter(&rm->m_rs_lock); 491 492 ro = rm->m_rdma_op; 493 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) && 494 ro && ro->r_notify && ro->r_notifier) { 495 notifier = ro->r_notifier; 496 rs = rm->m_rs; 497 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 498 499 notifier->n_status = status; 500 mutex_enter(&rs->rs_lock); 501 list_insert_tail(&rs->rs_notify_queue, notifier); 502 mutex_exit(&rs->rs_lock); 503 ro->r_notifier = NULL; 504 } 505 506 mutex_exit(&rm->m_rs_lock); 507 508 if (rs) { 509 rdsv3_wake_sk_sleep(rs); 510 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 511 } 512 513 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm); 514 } 515 516 /* 517 * This is the same as rdsv3_rdma_send_complete except we 518 * don't do any locking - we have all the ingredients (message, 519 * socket, socket lock) and can just move the notifier. 520 */ 521 static inline void 522 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm, 523 int status) 524 { 525 struct rdsv3_rdma_op *ro; 526 void *ic; 527 528 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete", 529 "Enter(rs: %p, rm: %p)", rs, rm); 530 531 ro = rm->m_rdma_op; 532 if (ro && ro->r_notify && ro->r_notifier) { 533 ro->r_notifier->n_status = status; 534 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier); 535 ro->r_notifier = NULL; 536 } 537 538 /* No need to wake the app - caller does this */ 539 } 540 541 /* 542 * This is called from the IB send completion when we detect 543 * a RDMA operation that failed with remote access error. 544 * So speed is not an issue here. 545 */ 546 struct rdsv3_message * 547 rdsv3_send_get_message(struct rdsv3_connection *conn, 548 struct rdsv3_rdma_op *op) 549 { 550 struct rdsv3_message *rm, *tmp, *found = NULL; 551 552 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn); 553 554 mutex_enter(&conn->c_lock); 555 556 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 557 if (rm->m_rdma_op == op) { 558 atomic_add_32(&rm->m_refcount, 1); 559 found = rm; 560 goto out; 561 } 562 } 563 564 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue, 565 m_conn_item) { 566 if (rm->m_rdma_op == op) { 567 atomic_add_32(&rm->m_refcount, 1); 568 found = rm; 569 break; 570 } 571 } 572 573 out: 574 mutex_exit(&conn->c_lock); 575 576 return (found); 577 } 578 579 /* 580 * This removes messages from the socket's list if they're on it. The list 581 * argument must be private to the caller, we must be able to modify it 582 * without locks. The messages must have a reference held for their 583 * position on the list. This function will drop that reference after 584 * removing the messages from the 'messages' list regardless of if it found 585 * the messages on the socket list or not. 586 */ 587 void 588 rdsv3_send_remove_from_sock(struct list *messages, int status) 589 { 590 struct rdsv3_sock *rs = NULL; 591 struct rdsv3_message *rm; 592 593 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter"); 594 595 while (!list_is_empty(messages)) { 596 int was_on_sock = 0; 597 rm = list_remove_head(messages); 598 599 /* 600 * If we see this flag cleared then we're *sure* that someone 601 * else beat us to removing it from the sock. If we race 602 * with their flag update we'll get the lock and then really 603 * see that the flag has been cleared. 604 * 605 * The message spinlock makes sure nobody clears rm->m_rs 606 * while we're messing with it. It does not prevent the 607 * message from being removed from the socket, though. 608 */ 609 mutex_enter(&rm->m_rs_lock); 610 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) 611 goto unlock_and_drop; 612 613 if (rs != rm->m_rs) { 614 if (rs) { 615 rdsv3_wake_sk_sleep(rs); 616 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 617 } 618 rs = rm->m_rs; 619 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 620 } 621 622 mutex_enter(&rs->rs_lock); 623 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) { 624 struct rdsv3_rdma_op *ro = rm->m_rdma_op; 625 struct rdsv3_notifier *notifier; 626 627 list_remove_node(&rm->m_sock_item); 628 rdsv3_send_sndbuf_remove(rs, rm); 629 if (ro && ro->r_notifier && 630 (status || ro->r_notify)) { 631 notifier = ro->r_notifier; 632 list_insert_tail(&rs->rs_notify_queue, 633 notifier); 634 if (!notifier->n_status) 635 notifier->n_status = status; 636 rm->m_rdma_op->r_notifier = NULL; 637 } 638 was_on_sock = 1; 639 rm->m_rs = NULL; 640 } 641 mutex_exit(&rs->rs_lock); 642 643 unlock_and_drop: 644 mutex_exit(&rm->m_rs_lock); 645 rdsv3_message_put(rm); 646 if (was_on_sock) 647 rdsv3_message_put(rm); 648 } 649 650 if (rs) { 651 rdsv3_wake_sk_sleep(rs); 652 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 653 } 654 655 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return"); 656 } 657 658 /* 659 * Transports call here when they've determined that the receiver queued 660 * messages up to, and including, the given sequence number. Messages are 661 * moved to the retrans queue when rdsv3_send_xmit picks them off the send 662 * queue. This means that in the TCP case, the message may not have been 663 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 664 * checks the RDSV3_MSG_HAS_ACK_SEQ bit. 665 * 666 * XXX It's not clear to me how this is safely serialized with socket 667 * destruction. Maybe it should bail if it sees SOCK_DEAD. 668 */ 669 void 670 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack, 671 is_acked_func is_acked) 672 { 673 struct rdsv3_message *rm, *tmp; 674 list_t list; 675 676 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn); 677 678 list_create(&list, sizeof (struct rdsv3_message), 679 offsetof(struct rdsv3_message, m_conn_item)); 680 681 mutex_enter(&conn->c_lock); 682 683 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 684 if (!rdsv3_send_is_acked(rm, ack, is_acked)) 685 break; 686 687 list_remove_node(&rm->m_conn_item); 688 list_insert_tail(&list, rm); 689 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 690 } 691 692 #if 0 693 XXX 694 /* order flag updates with spin locks */ 695 if (!list_is_empty(&list)) 696 smp_mb__after_clear_bit(); 697 #endif 698 699 mutex_exit(&conn->c_lock); 700 701 /* now remove the messages from the sock list as needed */ 702 rdsv3_send_remove_from_sock(&list, RDSV3_RDMA_SUCCESS); 703 704 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn); 705 } 706 707 void 708 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest) 709 { 710 struct rdsv3_message *rm, *tmp; 711 struct rdsv3_connection *conn; 712 list_t list; 713 int wake = 0; 714 715 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs); 716 717 list_create(&list, sizeof (struct rdsv3_message), 718 offsetof(struct rdsv3_message, m_sock_item)); 719 720 /* get all the messages we're dropping under the rs lock */ 721 mutex_enter(&rs->rs_lock); 722 723 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue, 724 m_sock_item) { 725 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 726 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 727 continue; 728 wake = 1; 729 list_remove(&rs->rs_send_queue, rm); 730 list_insert_tail(&list, rm); 731 rdsv3_send_sndbuf_remove(rs, rm); 732 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 733 } 734 735 mutex_exit(&rs->rs_lock); 736 737 conn = NULL; 738 739 /* now remove the messages from the conn list as needed */ 740 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) { 741 /* 742 * We do this here rather than in the loop above, so that 743 * we don't have to nest m_rs_lock under rs->rs_lock 744 */ 745 mutex_enter(&rm->m_rs_lock); 746 /* If this is a RDMA operation, notify the app. */ 747 __rdsv3_rdma_send_complete(rs, rm, RDSV3_RDMA_CANCELED); 748 rm->m_rs = NULL; 749 mutex_exit(&rm->m_rs_lock); 750 751 /* 752 * If we see this flag cleared then we're *sure* that someone 753 * else beat us to removing it from the conn. If we race 754 * with their flag update we'll get the lock and then really 755 * see that the flag has been cleared. 756 */ 757 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) 758 continue; 759 760 if (conn != rm->m_inc.i_conn) { 761 if (conn) 762 mutex_exit(&conn->c_lock); 763 conn = rm->m_inc.i_conn; 764 mutex_enter(&conn->c_lock); 765 } 766 767 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) { 768 list_remove_node(&rm->m_conn_item); 769 rdsv3_message_put(rm); 770 } 771 } 772 773 if (conn) 774 mutex_exit(&conn->c_lock); 775 776 if (wake) 777 rdsv3_wake_sk_sleep(rs); 778 779 while (!list_is_empty(&list)) { 780 rm = list_remove_head(&list); 781 782 rdsv3_message_wait(rm); 783 rdsv3_message_put(rm); 784 } 785 786 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs); 787 } 788 789 /* 790 * we only want this to fire once so we use the callers 'queued'. It's 791 * possible that another thread can race with us and remove the 792 * message from the flow with RDSV3_CANCEL_SENT_TO. 793 */ 794 static int 795 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn, 796 struct rdsv3_message *rm, uint16_be_t sport, 797 uint16_be_t dport, int *queued) 798 { 799 uint32_t len; 800 801 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm); 802 803 if (*queued) 804 goto out; 805 806 len = ntohl(rm->m_inc.i_hdr.h_len); 807 808 /* 809 * this is the only place which holds both the socket's rs_lock 810 * and the connection's c_lock 811 */ 812 mutex_enter(&rs->rs_lock); 813 814 /* 815 * If there is a little space in sndbuf, we don't queue anything, 816 * and userspace gets -EAGAIN. But poll() indicates there's send 817 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 818 * freed up by incoming acks. So we check the *old* value of 819 * rs_snd_bytes here to allow the last msg to exceed the buffer, 820 * and poll() now knows no more data can be sent. 821 */ 822 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) { 823 rs->rs_snd_bytes += len; 824 825 /* 826 * let recv side know we are close to send space exhaustion. 827 * This is probably not the optimal way to do it, as this 828 * means we set the flag on *all* messages as soon as our 829 * throughput hits a certain threshold. 830 */ 831 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2) 832 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 833 834 list_insert_tail(&rs->rs_send_queue, rm); 835 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 836 837 rdsv3_message_addref(rm); 838 rm->m_rs = rs; 839 840 /* 841 * The code ordering is a little weird, but we're 842 * trying to minimize the time we hold c_lock 843 */ 844 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport, 845 dport, 0); 846 rm->m_inc.i_conn = conn; 847 rdsv3_message_addref(rm); /* XXX - called twice */ 848 849 mutex_enter(&conn->c_lock); 850 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++); 851 list_insert_tail(&conn->c_send_queue, rm); 852 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 853 mutex_exit(&conn->c_lock); 854 855 RDSV3_DPRINTF5("rdsv3_send_queue_rm", 856 "queued msg %p len %d, rs %p bytes %d seq %llu", 857 rm, len, rs, rs->rs_snd_bytes, 858 (unsigned long long)ntohll( 859 rm->m_inc.i_hdr.h_sequence)); 860 861 *queued = 1; 862 } 863 864 mutex_exit(&rs->rs_lock); 865 866 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs); 867 out: 868 return (*queued); 869 } 870 871 static int 872 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm, 873 struct msghdr *msg, int *allocated_mr) 874 { 875 struct cmsghdr *cmsg; 876 int ret = 0; 877 878 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs); 879 880 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 881 882 if (cmsg->cmsg_level != SOL_RDS) 883 continue; 884 885 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d", 886 cmsg, rm, cmsg->cmsg_type); 887 /* 888 * As a side effect, RDMA_DEST and RDMA_MAP will set 889 * rm->m_rdma_cookie and rm->m_rdma_mr. 890 */ 891 switch (cmsg->cmsg_type) { 892 case RDSV3_CMSG_RDMA_ARGS: 893 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg); 894 break; 895 896 case RDSV3_CMSG_RDMA_DEST: 897 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg); 898 break; 899 900 case RDSV3_CMSG_RDMA_MAP: 901 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg); 902 if (ret) 903 *allocated_mr = 1; 904 break; 905 906 default: 907 return (-EINVAL); 908 } 909 910 if (ret) 911 break; 912 } 913 914 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs); 915 916 return (ret); 917 } 918 919 int 920 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg, 921 size_t payload_len) 922 { 923 struct rsock *sk = rdsv3_rs_to_sk(rs); 924 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 925 uint32_be_t daddr; 926 uint16_be_t dport; 927 struct rdsv3_message *rm = NULL; 928 struct rdsv3_connection *conn; 929 int ret = 0; 930 int queued = 0, allocated_mr = 0; 931 int nonblock = msg->msg_flags & MSG_DONTWAIT; 932 long timeo = rdsv3_sndtimeo(sk, nonblock); 933 934 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs); 935 936 if (msg->msg_namelen) { 937 /* XXX fail non-unicast destination IPs? */ 938 if (msg->msg_namelen < sizeof (*usin) || 939 usin->sin_family != AF_INET_OFFLOAD) { 940 ret = -EINVAL; 941 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 942 goto out; 943 } 944 daddr = usin->sin_addr.s_addr; 945 dport = usin->sin_port; 946 } else { 947 /* We only care about consistency with ->connect() */ 948 mutex_enter(&sk->sk_lock); 949 daddr = rs->rs_conn_addr; 950 dport = rs->rs_conn_port; 951 mutex_exit(&sk->sk_lock); 952 } 953 954 /* racing with another thread binding seems ok here */ 955 if (daddr == 0 || rs->rs_bound_addr == 0) { 956 ret = -ENOTCONN; /* XXX not a great errno */ 957 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 958 goto out; 959 } 960 961 rm = rdsv3_message_copy_from_user(uio, payload_len); 962 if (IS_ERR(rm)) { 963 ret = PTR_ERR(rm); 964 RDSV3_DPRINTF2("rdsv3_sendmsg", 965 "rdsv3_message_copy_from_user failed %d", -ret); 966 rm = NULL; 967 goto out; 968 } 969 970 rm->m_daddr = daddr; 971 972 /* Parse any control messages the user may have included. */ 973 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr); 974 if (ret) { 975 RDSV3_DPRINTF2("rdsv3_sendmsg", 976 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d", 977 rs, rm, msg, ret); 978 goto out; 979 } 980 981 /* 982 * rdsv3_conn_create has a spinlock that runs with IRQ off. 983 * Caching the conn in the socket helps a lot. 984 */ 985 mutex_enter(&rs->rs_conn_lock); 986 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) { 987 conn = rs->rs_conn; 988 } else { 989 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr, 990 daddr, rs->rs_transport, KM_NOSLEEP); 991 if (IS_ERR(conn)) { 992 mutex_exit(&rs->rs_conn_lock); 993 ret = PTR_ERR(conn); 994 RDSV3_DPRINTF2("rdsv3_sendmsg", 995 "rdsv3_conn_create_outgoing failed %d", 996 -ret); 997 goto out; 998 } 999 rs->rs_conn = conn; 1000 } 1001 mutex_exit(&rs->rs_conn_lock); 1002 1003 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1004 conn->c_trans->xmit_rdma == NULL) { 1005 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p", 1006 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1007 ret = -EOPNOTSUPP; 1008 goto out; 1009 } 1010 1011 /* 1012 * If the connection is down, trigger a connect. We may 1013 * have scheduled a delayed reconnect however - in this case 1014 * we should not interfere. 1015 */ 1016 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1017 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1018 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1019 1020 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs); 1021 if (ret) { 1022 mutex_enter(&rs->rs_congested_lock); 1023 rs->rs_seen_congestion = 1; 1024 cv_signal(&rs->rs_congested_cv); 1025 mutex_exit(&rs->rs_congested_lock); 1026 1027 RDSV3_DPRINTF2("rdsv3_sendmsg", 1028 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret); 1029 goto out; 1030 } 1031 1032 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, 1033 &queued); 1034 if (!queued) { 1035 /* rdsv3_stats_inc(s_send_queue_full); */ 1036 /* XXX make sure this is reasonable */ 1037 if (payload_len > rdsv3_sk_sndbuf(rs)) { 1038 ret = -EMSGSIZE; 1039 RDSV3_DPRINTF2("rdsv3_sendmsg", 1040 "msgsize(%d) too big, returning: %d", 1041 payload_len, -ret); 1042 goto out; 1043 } 1044 if (nonblock) { 1045 ret = -EAGAIN; 1046 RDSV3_DPRINTF3("rdsv3_sendmsg", 1047 "send queue full (%d), returning: %d", 1048 payload_len, -ret); 1049 goto out; 1050 } 1051 1052 #if 0 1053 ret = rdsv3_wait_sig(sk->sk_sleep, 1054 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1055 dport, &queued))); 1056 if (ret == 0) { 1057 /* signal/timeout pending */ 1058 RDSV3_DPRINTF2("rdsv3_sendmsg", 1059 "woke due to signal: %d", ret); 1060 ret = -ERESTART; 1061 goto out; 1062 } 1063 #else 1064 mutex_enter(&sk->sk_sleep->waitq_mutex); 1065 sk->sk_sleep->waitq_waiters++; 1066 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1067 dport, &queued)) { 1068 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 1069 &sk->sk_sleep->waitq_mutex); 1070 if (ret == 0) { 1071 /* signal/timeout pending */ 1072 RDSV3_DPRINTF2("rdsv3_sendmsg", 1073 "woke due to signal: %d", ret); 1074 ret = -ERESTART; 1075 sk->sk_sleep->waitq_waiters--; 1076 mutex_exit(&sk->sk_sleep->waitq_mutex); 1077 goto out; 1078 } 1079 } 1080 sk->sk_sleep->waitq_waiters--; 1081 mutex_exit(&sk->sk_sleep->waitq_mutex); 1082 #endif 1083 1084 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d", 1085 queued); 1086 1087 ASSERT(queued); 1088 ret = 0; 1089 } 1090 1091 /* 1092 * By now we've committed to the send. We reuse rdsv3_send_worker() 1093 * to retry sends in the rds thread if the transport asks us to. 1094 */ 1095 rdsv3_stats_inc(s_send_queued); 1096 1097 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1098 (void) rdsv3_send_xmit(conn); 1099 1100 rdsv3_message_put(rm); 1101 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)", 1102 rs, payload_len); 1103 return (payload_len); 1104 1105 out: 1106 /* 1107 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1108 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1109 * or in any other way, we need to destroy the MR again 1110 */ 1111 if (allocated_mr) 1112 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 1113 1); 1114 1115 if (rm) 1116 rdsv3_message_put(rm); 1117 return (ret); 1118 } 1119 1120 /* 1121 * Reply to a ping packet. 1122 */ 1123 int 1124 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport) 1125 { 1126 struct rdsv3_message *rm; 1127 int ret = 0; 1128 1129 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn); 1130 1131 rm = rdsv3_message_alloc(0, KM_NOSLEEP); 1132 if (!rm) { 1133 ret = -ENOMEM; 1134 goto out; 1135 } 1136 1137 rm->m_daddr = conn->c_faddr; 1138 1139 /* 1140 * If the connection is down, trigger a connect. We may 1141 * have scheduled a delayed reconnect however - in this case 1142 * we should not interfere. 1143 */ 1144 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1145 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1146 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1147 1148 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL); 1149 if (ret) 1150 goto out; 1151 1152 mutex_enter(&conn->c_lock); 1153 list_insert_tail(&conn->c_send_queue, rm); 1154 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 1155 rdsv3_message_addref(rm); 1156 rm->m_inc.i_conn = conn; 1157 1158 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1159 conn->c_next_tx_seq); 1160 conn->c_next_tx_seq++; 1161 mutex_exit(&conn->c_lock); 1162 1163 rdsv3_stats_inc(s_send_queued); 1164 rdsv3_stats_inc(s_send_pong); 1165 1166 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1167 (void) rdsv3_send_xmit(conn); 1168 1169 rdsv3_message_put(rm); 1170 1171 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn); 1172 return (0); 1173 1174 out: 1175 if (rm) 1176 rdsv3_message_put(rm); 1177 return (ret); 1178 } 1179