1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2006 Oracle. All rights reserved. 27 * 28 * This software is available to you under a choice of one of two 29 * licenses. You may choose to be licensed under the terms of the GNU 30 * General Public License (GPL) Version 2, available from the file 31 * COPYING in the main directory of this source tree, or the 32 * OpenIB.org BSD license below: 33 * 34 * Redistribution and use in source and binary forms, with or 35 * without modification, are permitted provided that the following 36 * conditions are met: 37 * 38 * - Redistributions of source code must retain the above 39 * copyright notice, this list of conditions and the following 40 * disclaimer. 41 * 42 * - Redistributions in binary form must reproduce the above 43 * copyright notice, this list of conditions and the following 44 * disclaimer in the documentation and/or other materials 45 * provided with the distribution. 46 * 47 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 48 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 49 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 50 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 51 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 52 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 53 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 54 * SOFTWARE. 55 * 56 */ 57 #include <sys/stropts.h> 58 #include <sys/systm.h> 59 60 #include <sys/rds.h> 61 #include <sys/socket.h> 62 #include <sys/socketvar.h> 63 64 #include <sys/ib/clients/rdsv3/rdsv3.h> 65 #include <sys/ib/clients/rdsv3/rdma.h> 66 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 67 68 /* 69 * When transmitting messages in rdsv3_send_xmit, we need to emerge from 70 * time to time and briefly release the CPU. Otherwise the softlock watchdog 71 * will kick our shin. 72 * Also, it seems fairer to not let one busy connection stall all the 73 * others. 74 * 75 * send_batch_count is the number of times we'll loop in send_xmit. Setting 76 * it to 0 will restore the old behavior (where we looped until we had 77 * drained the queue). 78 */ 79 static int send_batch_count = 64; 80 81 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op); 82 /* 83 * Reset the send state. Caller must hold c_send_lock when calling here. 84 */ 85 void 86 rdsv3_send_reset(struct rdsv3_connection *conn) 87 { 88 struct rdsv3_message *rm, *tmp; 89 struct rdsv3_rdma_op *ro; 90 91 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn); 92 93 if (conn->c_xmit_rm) { 94 rm = conn->c_xmit_rm; 95 ro = rm->m_rdma_op; 96 if (ro && ro->r_mapped) { 97 RDSV3_DPRINTF2("rdsv3_send_reset", 98 "rm %p mflg 0x%x map %d mihdl %p sgl %p", 99 rm, rm->m_flags, ro->r_mapped, 100 ro->r_rdma_sg[0].mihdl, 101 ro->r_rdma_sg[0].swr.wr_sgl); 102 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro); 103 } 104 /* 105 * Tell the user the RDMA op is no longer mapped by the 106 * transport. This isn't entirely true (it's flushed out 107 * independently) but as the connection is down, there's 108 * no ongoing RDMA to/from that memory 109 */ 110 rdsv3_message_unmapped(conn->c_xmit_rm); 111 rdsv3_message_put(conn->c_xmit_rm); 112 conn->c_xmit_rm = NULL; 113 } 114 conn->c_xmit_sg = 0; 115 conn->c_xmit_hdr_off = 0; 116 conn->c_xmit_data_off = 0; 117 conn->c_xmit_rdma_sent = 0; 118 119 conn->c_map_queued = 0; 120 121 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets; 122 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes; 123 124 /* Mark messages as retransmissions, and move them to the send q */ 125 mutex_enter(&conn->c_lock); 126 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 127 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 128 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags); 129 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) { 130 RDSV3_DPRINTF4("_send_reset", 131 "RT rm %p mflg 0x%x sgl %p", 132 rm, rm->m_flags, 133 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl); 134 } 135 } 136 list_move_tail(&conn->c_send_queue, &conn->c_retrans); 137 mutex_exit(&conn->c_lock); 138 139 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn); 140 } 141 142 /* 143 * We're making the concious trade-off here to only send one message 144 * down the connection at a time. 145 * Pro: 146 * - tx queueing is a simple fifo list 147 * - reassembly is optional and easily done by transports per conn 148 * - no per flow rx lookup at all, straight to the socket 149 * - less per-frag memory and wire overhead 150 * Con: 151 * - queued acks can be delayed behind large messages 152 * Depends: 153 * - small message latency is higher behind queued large messages 154 * - large message latency isn't starved by intervening small sends 155 */ 156 int 157 rdsv3_send_xmit(struct rdsv3_connection *conn) 158 { 159 struct rdsv3_message *rm; 160 unsigned int tmp; 161 unsigned int send_quota = send_batch_count; 162 struct rdsv3_scatterlist *sg; 163 int ret = 0; 164 int was_empty = 0; 165 list_t to_be_dropped; 166 167 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn); 168 169 list_create(&to_be_dropped, sizeof (struct rdsv3_message), 170 offsetof(struct rdsv3_message, m_conn_item)); 171 172 /* 173 * sendmsg calls here after having queued its message on the send 174 * queue. We only have one task feeding the connection at a time. If 175 * another thread is already feeding the queue then we back off. This 176 * avoids blocking the caller and trading per-connection data between 177 * caches per message. 178 * 179 * The sem holder will issue a retry if they notice that someone queued 180 * a message after they stopped walking the send queue but before they 181 * dropped the sem. 182 */ 183 if (!mutex_tryenter(&conn->c_send_lock)) { 184 RDSV3_DPRINTF4("rdsv3_send_xmit", 185 "Another thread running(conn: %p)", conn); 186 rdsv3_stats_inc(s_send_sem_contention); 187 ret = -ENOMEM; 188 goto out; 189 } 190 191 if (conn->c_trans->xmit_prepare) 192 conn->c_trans->xmit_prepare(conn); 193 194 /* 195 * spin trying to push headers and data down the connection until 196 * the connection doens't make forward progress. 197 */ 198 while (--send_quota) { 199 /* 200 * See if need to send a congestion map update if we're 201 * between sending messages. The send_sem protects our sole 202 * use of c_map_offset and _bytes. 203 * Note this is used only by transports that define a special 204 * xmit_cong_map function. For all others, we create allocate 205 * a cong_map message and treat it just like any other send. 206 */ 207 if (conn->c_map_bytes) { 208 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 209 conn->c_map_offset); 210 if (ret <= 0) 211 break; 212 213 conn->c_map_offset += ret; 214 conn->c_map_bytes -= ret; 215 if (conn->c_map_bytes) 216 continue; 217 } 218 219 /* 220 * If we're done sending the current message, clear the 221 * offset and S/G temporaries. 222 */ 223 rm = conn->c_xmit_rm; 224 if (rm != NULL && 225 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) && 226 conn->c_xmit_sg == rm->m_nents) { 227 conn->c_xmit_rm = NULL; 228 conn->c_xmit_sg = 0; 229 conn->c_xmit_hdr_off = 0; 230 conn->c_xmit_data_off = 0; 231 conn->c_xmit_rdma_sent = 0; 232 233 /* Release the reference to the previous message. */ 234 rdsv3_message_put(rm); 235 rm = NULL; 236 } 237 238 /* If we're asked to send a cong map update, do so. */ 239 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 240 if (conn->c_trans->xmit_cong_map != NULL) { 241 conn->c_map_offset = 0; 242 conn->c_map_bytes = 243 sizeof (struct rdsv3_header) + 244 RDSV3_CONG_MAP_BYTES; 245 continue; 246 } 247 248 rm = rdsv3_cong_update_alloc(conn); 249 if (IS_ERR(rm)) { 250 ret = PTR_ERR(rm); 251 break; 252 } 253 254 conn->c_xmit_rm = rm; 255 } 256 257 /* 258 * Grab the next message from the send queue, if there is one. 259 * 260 * c_xmit_rm holds a ref while we're sending this message down 261 * the connction. We can use this ref while holding the 262 * send_sem.. rdsv3_send_reset() is serialized with it. 263 */ 264 if (rm == NULL) { 265 unsigned int len; 266 267 mutex_enter(&conn->c_lock); 268 269 if (!list_is_empty(&conn->c_send_queue)) { 270 rm = list_remove_head(&conn->c_send_queue); 271 rdsv3_message_addref(rm); 272 273 /* 274 * Move the message from the send queue to 275 * the retransmit 276 * list right away. 277 */ 278 list_insert_tail(&conn->c_retrans, rm); 279 } 280 281 mutex_exit(&conn->c_lock); 282 283 if (rm == NULL) { 284 was_empty = 1; 285 break; 286 } 287 288 /* 289 * Unfortunately, the way Infiniband deals with 290 * RDMA to a bad MR key is by moving the entire 291 * queue pair to error state. We cold possibly 292 * recover from that, but right now we drop the 293 * connection. 294 * Therefore, we never retransmit messages with 295 * RDMA ops. 296 */ 297 if (rm->m_rdma_op && 298 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) { 299 mutex_enter(&conn->c_lock); 300 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, 301 &rm->m_flags)) 302 list_remove_node(&rm->m_conn_item); 303 list_insert_tail(&to_be_dropped, rm); 304 mutex_exit(&conn->c_lock); 305 rdsv3_message_put(rm); 306 continue; 307 } 308 309 /* Require an ACK every once in a while */ 310 len = ntohl(rm->m_inc.i_hdr.h_len); 311 if (conn->c_unacked_packets == 0 || 312 conn->c_unacked_bytes < len) { 313 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 314 315 conn->c_unacked_packets = 316 rdsv3_sysctl_max_unacked_packets; 317 conn->c_unacked_bytes = 318 rdsv3_sysctl_max_unacked_bytes; 319 rdsv3_stats_inc(s_send_ack_required); 320 } else { 321 conn->c_unacked_bytes -= len; 322 conn->c_unacked_packets--; 323 } 324 325 conn->c_xmit_rm = rm; 326 } 327 328 /* 329 * Try and send an rdma message. Let's see if we can 330 * keep this simple and require that the transport either 331 * send the whole rdma or none of it. 332 */ 333 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { 334 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); 335 if (ret) 336 break; 337 conn->c_xmit_rdma_sent = 1; 338 /* 339 * The transport owns the mapped memory for now. 340 * You can't unmap it while it's on the send queue 341 */ 342 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags); 343 } 344 345 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) || 346 conn->c_xmit_sg < rm->m_nents) { 347 ret = conn->c_trans->xmit(conn, rm, 348 conn->c_xmit_hdr_off, 349 conn->c_xmit_sg, 350 conn->c_xmit_data_off); 351 if (ret <= 0) 352 break; 353 354 if (conn->c_xmit_hdr_off < 355 sizeof (struct rdsv3_header)) { 356 tmp = min(ret, 357 sizeof (struct rdsv3_header) - 358 conn->c_xmit_hdr_off); 359 conn->c_xmit_hdr_off += tmp; 360 ret -= tmp; 361 } 362 363 sg = &rm->m_sg[conn->c_xmit_sg]; 364 while (ret) { 365 tmp = min(ret, rdsv3_sg_len(sg) - 366 conn->c_xmit_data_off); 367 conn->c_xmit_data_off += tmp; 368 ret -= tmp; 369 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) { 370 conn->c_xmit_data_off = 0; 371 sg++; 372 conn->c_xmit_sg++; 373 ASSERT(!(ret != 0 && 374 conn->c_xmit_sg == rm->m_nents)); 375 } 376 } 377 } 378 } 379 380 /* Nuke any messages we decided not to retransmit. */ 381 if (!list_is_empty(&to_be_dropped)) 382 rdsv3_send_remove_from_sock(&to_be_dropped, RDSV3_RDMA_DROPPED); 383 384 if (conn->c_trans->xmit_complete) 385 conn->c_trans->xmit_complete(conn); 386 387 /* 388 * We might be racing with another sender who queued a message but 389 * backed off on noticing that we held the c_send_lock. If we check 390 * for queued messages after dropping the sem then either we'll 391 * see the queued message or the queuer will get the sem. If we 392 * notice the queued message then we trigger an immediate retry. 393 * 394 * We need to be careful only to do this when we stopped processing 395 * the send queue because it was empty. It's the only way we 396 * stop processing the loop when the transport hasn't taken 397 * responsibility for forward progress. 398 */ 399 mutex_exit(&conn->c_send_lock); 400 401 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 402 /* 403 * We exhausted the send quota, but there's work left to 404 * do. Return and (re-)schedule the send worker. 405 */ 406 ret = -EAGAIN; 407 } 408 409 if (ret == 0 && was_empty) { 410 /* 411 * A simple bit test would be way faster than taking the 412 * spin lock 413 */ 414 mutex_enter(&conn->c_lock); 415 if (!list_is_empty(&conn->c_send_queue)) { 416 rdsv3_stats_inc(s_send_sem_queue_raced); 417 ret = -EAGAIN; 418 } 419 mutex_exit(&conn->c_lock); 420 } 421 422 out: 423 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)", 424 conn, ret); 425 return (ret); 426 } 427 428 static void 429 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm) 430 { 431 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len); 432 433 ASSERT(mutex_owned(&rs->rs_lock)); 434 435 ASSERT(rs->rs_snd_bytes >= len); 436 rs->rs_snd_bytes -= len; 437 438 if (rs->rs_snd_bytes == 0) 439 rdsv3_stats_inc(s_send_queue_empty); 440 } 441 442 static inline int 443 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack, 444 is_acked_func is_acked) 445 { 446 if (is_acked) 447 return (is_acked(rm, ack)); 448 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack); 449 } 450 451 /* 452 * Returns true if there are no messages on the send and retransmit queues 453 * which have a sequence number greater than or equal to the given sequence 454 * number. 455 */ 456 int 457 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq) 458 { 459 struct rdsv3_message *rm; 460 int ret = 1; 461 462 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn); 463 464 mutex_enter(&conn->c_lock); 465 466 /* XXX - original code spits out warning */ 467 rm = list_head(&conn->c_retrans); 468 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 469 ret = 0; 470 471 /* XXX - original code spits out warning */ 472 rm = list_head(&conn->c_send_queue); 473 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 474 ret = 0; 475 476 mutex_exit(&conn->c_lock); 477 478 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn); 479 480 return (ret); 481 } 482 483 /* 484 * This is pretty similar to what happens below in the ACK 485 * handling code - except that we call here as soon as we get 486 * the IB send completion on the RDMA op and the accompanying 487 * message. 488 */ 489 void 490 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status) 491 { 492 struct rdsv3_sock *rs = NULL; 493 struct rdsv3_rdma_op *ro; 494 struct rdsv3_notifier *notifier; 495 496 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm); 497 498 mutex_enter(&rm->m_rs_lock); 499 500 ro = rm->m_rdma_op; 501 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) && 502 ro && ro->r_notify && ro->r_notifier) { 503 notifier = ro->r_notifier; 504 rs = rm->m_rs; 505 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 506 507 notifier->n_status = status; 508 mutex_enter(&rs->rs_lock); 509 list_insert_tail(&rs->rs_notify_queue, notifier); 510 mutex_exit(&rs->rs_lock); 511 512 ro->r_notifier = NULL; 513 } 514 515 mutex_exit(&rm->m_rs_lock); 516 517 if (rs) { 518 rdsv3_wake_sk_sleep(rs); 519 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 520 } 521 522 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm); 523 } 524 525 /* 526 * This is the same as rdsv3_rdma_send_complete except we 527 * don't do any locking - we have all the ingredients (message, 528 * socket, socket lock) and can just move the notifier. 529 */ 530 static inline void 531 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm, 532 int status) 533 { 534 struct rdsv3_rdma_op *ro; 535 void *ic; 536 537 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete", 538 "Enter(rs: %p, rm: %p)", rs, rm); 539 540 ro = rm->m_rdma_op; 541 if (ro && ro->r_notify && ro->r_notifier) { 542 ro->r_notifier->n_status = status; 543 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier); 544 ro->r_notifier = NULL; 545 } 546 547 /* No need to wake the app - caller does this */ 548 } 549 550 /* 551 * This is called from the IB send completion when we detect 552 * a RDMA operation that failed with remote access error. 553 * So speed is not an issue here. 554 */ 555 struct rdsv3_message * 556 rdsv3_send_get_message(struct rdsv3_connection *conn, 557 struct rdsv3_rdma_op *op) 558 { 559 struct rdsv3_message *rm, *tmp, *found = NULL; 560 561 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn); 562 563 mutex_enter(&conn->c_lock); 564 565 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 566 if (rm->m_rdma_op == op) { 567 atomic_add_32(&rm->m_refcount, 1); 568 found = rm; 569 goto out; 570 } 571 } 572 573 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue, 574 m_conn_item) { 575 if (rm->m_rdma_op == op) { 576 atomic_add_32(&rm->m_refcount, 1); 577 found = rm; 578 break; 579 } 580 } 581 582 out: 583 mutex_exit(&conn->c_lock); 584 585 return (found); 586 } 587 588 /* 589 * This removes messages from the socket's list if they're on it. The list 590 * argument must be private to the caller, we must be able to modify it 591 * without locks. The messages must have a reference held for their 592 * position on the list. This function will drop that reference after 593 * removing the messages from the 'messages' list regardless of if it found 594 * the messages on the socket list or not. 595 */ 596 void 597 rdsv3_send_remove_from_sock(struct list *messages, int status) 598 { 599 struct rdsv3_sock *rs = NULL; 600 struct rdsv3_message *rm; 601 602 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter"); 603 604 while (!list_is_empty(messages)) { 605 int was_on_sock = 0; 606 rm = list_remove_head(messages); 607 608 /* 609 * If we see this flag cleared then we're *sure* that someone 610 * else beat us to removing it from the sock. If we race 611 * with their flag update we'll get the lock and then really 612 * see that the flag has been cleared. 613 * 614 * The message spinlock makes sure nobody clears rm->m_rs 615 * while we're messing with it. It does not prevent the 616 * message from being removed from the socket, though. 617 */ 618 mutex_enter(&rm->m_rs_lock); 619 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) 620 goto unlock_and_drop; 621 622 if (rs != rm->m_rs) { 623 if (rs) { 624 rdsv3_wake_sk_sleep(rs); 625 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 626 } 627 rs = rm->m_rs; 628 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 629 } 630 631 mutex_enter(&rs->rs_lock); 632 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) { 633 struct rdsv3_rdma_op *ro = rm->m_rdma_op; 634 struct rdsv3_notifier *notifier; 635 636 list_remove_node(&rm->m_sock_item); 637 rdsv3_send_sndbuf_remove(rs, rm); 638 if (ro && ro->r_notifier && 639 (status || ro->r_notify)) { 640 notifier = ro->r_notifier; 641 list_insert_tail(&rs->rs_notify_queue, 642 notifier); 643 if (!notifier->n_status) 644 notifier->n_status = status; 645 rm->m_rdma_op->r_notifier = NULL; 646 } 647 was_on_sock = 1; 648 rm->m_rs = NULL; 649 } 650 mutex_exit(&rs->rs_lock); 651 652 unlock_and_drop: 653 mutex_exit(&rm->m_rs_lock); 654 rdsv3_message_put(rm); 655 if (was_on_sock) 656 rdsv3_message_put(rm); 657 } 658 659 if (rs) { 660 rdsv3_wake_sk_sleep(rs); 661 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 662 } 663 664 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return"); 665 } 666 667 /* 668 * Transports call here when they've determined that the receiver queued 669 * messages up to, and including, the given sequence number. Messages are 670 * moved to the retrans queue when rdsv3_send_xmit picks them off the send 671 * queue. This means that in the TCP case, the message may not have been 672 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 673 * checks the RDSV3_MSG_HAS_ACK_SEQ bit. 674 * 675 * XXX It's not clear to me how this is safely serialized with socket 676 * destruction. Maybe it should bail if it sees SOCK_DEAD. 677 */ 678 void 679 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack, 680 is_acked_func is_acked) 681 { 682 struct rdsv3_message *rm, *tmp; 683 list_t list; 684 685 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn); 686 687 list_create(&list, sizeof (struct rdsv3_message), 688 offsetof(struct rdsv3_message, m_conn_item)); 689 690 mutex_enter(&conn->c_lock); 691 692 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 693 if (!rdsv3_send_is_acked(rm, ack, is_acked)) 694 break; 695 696 list_remove_node(&rm->m_conn_item); 697 list_insert_tail(&list, rm); 698 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 699 } 700 701 #if 0 702 XXX 703 /* order flag updates with spin locks */ 704 if (!list_is_empty(&list)) 705 smp_mb__after_clear_bit(); 706 #endif 707 708 mutex_exit(&conn->c_lock); 709 710 /* now remove the messages from the sock list as needed */ 711 rdsv3_send_remove_from_sock(&list, RDSV3_RDMA_SUCCESS); 712 713 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn); 714 } 715 716 void 717 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest) 718 { 719 struct rdsv3_message *rm, *tmp; 720 struct rdsv3_connection *conn; 721 list_t list; 722 int wake = 0; 723 724 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs); 725 726 list_create(&list, sizeof (struct rdsv3_message), 727 offsetof(struct rdsv3_message, m_sock_item)); 728 729 /* get all the messages we're dropping under the rs lock */ 730 mutex_enter(&rs->rs_lock); 731 732 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue, 733 m_sock_item) { 734 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 735 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 736 continue; 737 738 wake = 1; 739 list_remove(&rs->rs_send_queue, rm); 740 list_insert_tail(&list, rm); 741 rdsv3_send_sndbuf_remove(rs, rm); 742 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 743 } 744 745 mutex_exit(&rs->rs_lock); 746 747 conn = NULL; 748 749 /* now remove the messages from the conn list as needed */ 750 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) { 751 /* 752 * We do this here rather than in the loop above, so that 753 * we don't have to nest m_rs_lock under rs->rs_lock 754 */ 755 mutex_enter(&rm->m_rs_lock); 756 /* If this is a RDMA operation, notify the app. */ 757 __rdsv3_rdma_send_complete(rs, rm, RDSV3_RDMA_CANCELED); 758 rm->m_rs = NULL; 759 mutex_exit(&rm->m_rs_lock); 760 761 /* 762 * If we see this flag cleared then we're *sure* that someone 763 * else beat us to removing it from the conn. If we race 764 * with their flag update we'll get the lock and then really 765 * see that the flag has been cleared. 766 */ 767 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) 768 continue; 769 770 if (conn != rm->m_inc.i_conn) { 771 if (conn) 772 mutex_exit(&conn->c_lock); 773 conn = rm->m_inc.i_conn; 774 mutex_enter(&conn->c_lock); 775 } 776 777 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) { 778 list_remove_node(&rm->m_conn_item); 779 rdsv3_message_put(rm); 780 } 781 } 782 783 if (conn) 784 mutex_exit(&conn->c_lock); 785 786 if (wake) 787 rdsv3_wake_sk_sleep(rs); 788 789 while (!list_is_empty(&list)) { 790 rm = list_remove_head(&list); 791 792 rdsv3_message_wait(rm); 793 rdsv3_message_put(rm); 794 } 795 796 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs); 797 } 798 799 /* 800 * we only want this to fire once so we use the callers 'queued'. It's 801 * possible that another thread can race with us and remove the 802 * message from the flow with RDSV3_CANCEL_SENT_TO. 803 */ 804 static int 805 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn, 806 struct rdsv3_message *rm, uint16_be_t sport, 807 uint16_be_t dport, int *queued) 808 { 809 uint32_t len; 810 811 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm); 812 813 if (*queued) 814 goto out; 815 816 len = ntohl(rm->m_inc.i_hdr.h_len); 817 818 /* 819 * this is the only place which holds both the socket's rs_lock 820 * and the connection's c_lock 821 */ 822 mutex_enter(&rs->rs_lock); 823 824 /* 825 * If there is a little space in sndbuf, we don't queue anything, 826 * and userspace gets -EAGAIN. But poll() indicates there's send 827 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 828 * freed up by incoming acks. So we check the *old* value of 829 * rs_snd_bytes here to allow the last msg to exceed the buffer, 830 * and poll() now knows no more data can be sent. 831 */ 832 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) { 833 rs->rs_snd_bytes += len; 834 835 /* 836 * let recv side know we are close to send space exhaustion. 837 * This is probably not the optimal way to do it, as this 838 * means we set the flag on *all* messages as soon as our 839 * throughput hits a certain threshold. 840 */ 841 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2) 842 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 843 844 list_insert_tail(&rs->rs_send_queue, rm); 845 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 846 847 rdsv3_message_addref(rm); 848 rm->m_rs = rs; 849 850 /* 851 * The code ordering is a little weird, but we're 852 * trying to minimize the time we hold c_lock 853 */ 854 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport, 855 dport, 0); 856 rm->m_inc.i_conn = conn; 857 rdsv3_message_addref(rm); /* XXX - called twice */ 858 859 mutex_enter(&conn->c_lock); 860 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++); 861 list_insert_tail(&conn->c_send_queue, rm); 862 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 863 mutex_exit(&conn->c_lock); 864 865 RDSV3_DPRINTF5("rdsv3_send_queue_rm", 866 "queued msg %p len %d, rs %p bytes %d seq %llu", 867 rm, len, rs, rs->rs_snd_bytes, 868 (unsigned long long)ntohll( 869 rm->m_inc.i_hdr.h_sequence)); 870 871 *queued = 1; 872 } 873 874 mutex_exit(&rs->rs_lock); 875 876 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs); 877 out: 878 return (*queued); 879 } 880 881 static int 882 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm, 883 struct msghdr *msg, int *allocated_mr) 884 { 885 struct cmsghdr *cmsg; 886 int ret = 0; 887 888 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs); 889 890 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 891 892 if (cmsg->cmsg_level != SOL_RDS) 893 continue; 894 895 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d", 896 cmsg, rm, cmsg->cmsg_type); 897 /* 898 * As a side effect, RDMA_DEST and RDMA_MAP will set 899 * rm->m_rdma_cookie and rm->m_rdma_mr. 900 */ 901 switch (cmsg->cmsg_type) { 902 case RDSV3_CMSG_RDMA_ARGS: 903 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg); 904 break; 905 906 case RDSV3_CMSG_RDMA_DEST: 907 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg); 908 break; 909 910 case RDSV3_CMSG_RDMA_MAP: 911 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg); 912 if (ret) 913 *allocated_mr = 1; 914 break; 915 916 default: 917 return (-EINVAL); 918 } 919 920 if (ret) 921 break; 922 } 923 924 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs); 925 926 return (ret); 927 } 928 929 int 930 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg, 931 size_t payload_len) 932 { 933 struct rsock *sk = rdsv3_rs_to_sk(rs); 934 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 935 uint32_be_t daddr; 936 uint16_be_t dport; 937 struct rdsv3_message *rm = NULL; 938 struct rdsv3_connection *conn; 939 int ret = 0; 940 int queued = 0, allocated_mr = 0; 941 int nonblock = msg->msg_flags & MSG_DONTWAIT; 942 long timeo = rdsv3_sndtimeo(sk, nonblock); 943 944 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs); 945 946 if (msg->msg_namelen) { 947 /* XXX fail non-unicast destination IPs? */ 948 if (msg->msg_namelen < sizeof (*usin) || 949 usin->sin_family != AF_INET_OFFLOAD) { 950 ret = -EINVAL; 951 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 952 goto out; 953 } 954 daddr = usin->sin_addr.s_addr; 955 dport = usin->sin_port; 956 } else { 957 /* We only care about consistency with ->connect() */ 958 mutex_enter(&sk->sk_lock); 959 daddr = rs->rs_conn_addr; 960 dport = rs->rs_conn_port; 961 mutex_exit(&sk->sk_lock); 962 } 963 964 /* racing with another thread binding seems ok here */ 965 if (daddr == 0 || rs->rs_bound_addr == 0) { 966 ret = -ENOTCONN; /* XXX not a great errno */ 967 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 968 goto out; 969 } 970 971 rm = rdsv3_message_copy_from_user(uio, payload_len); 972 if (IS_ERR(rm)) { 973 ret = PTR_ERR(rm); 974 RDSV3_DPRINTF2("rdsv3_sendmsg", 975 "rdsv3_message_copy_from_user failed %d", -ret); 976 rm = NULL; 977 goto out; 978 } 979 980 rm->m_daddr = daddr; 981 982 /* Parse any control messages the user may have included. */ 983 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr); 984 if (ret) { 985 RDSV3_DPRINTF2("rdsv3_sendmsg", 986 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d", 987 rs, rm, msg, ret); 988 goto out; 989 } 990 991 /* 992 * rdsv3_conn_create has a spinlock that runs with IRQ off. 993 * Caching the conn in the socket helps a lot. 994 */ 995 mutex_enter(&rs->rs_conn_lock); 996 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) { 997 conn = rs->rs_conn; 998 } else { 999 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr, 1000 daddr, rs->rs_transport, KM_NOSLEEP); 1001 if (IS_ERR(conn)) { 1002 mutex_exit(&rs->rs_conn_lock); 1003 ret = PTR_ERR(conn); 1004 RDSV3_DPRINTF2("rdsv3_sendmsg", 1005 "rdsv3_conn_create_outgoing failed %d", 1006 -ret); 1007 goto out; 1008 } 1009 rs->rs_conn = conn; 1010 } 1011 mutex_exit(&rs->rs_conn_lock); 1012 1013 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1014 conn->c_trans->xmit_rdma == NULL) { 1015 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p", 1016 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1017 ret = -EOPNOTSUPP; 1018 goto out; 1019 } 1020 1021 /* 1022 * If the connection is down, trigger a connect. We may 1023 * have scheduled a delayed reconnect however - in this case 1024 * we should not interfere. 1025 */ 1026 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1027 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1028 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1029 1030 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs); 1031 if (ret) { 1032 mutex_enter(&rdsv3_poll_waitq.waitq_mutex); 1033 rs->rs_seen_congestion = 1; 1034 cv_signal(&rdsv3_poll_waitq.waitq_cv); 1035 mutex_exit(&rdsv3_poll_waitq.waitq_mutex); 1036 1037 RDSV3_DPRINTF2("rdsv3_sendmsg", 1038 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret); 1039 goto out; 1040 } 1041 1042 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, 1043 &queued); 1044 if (!queued) { 1045 /* rdsv3_stats_inc(s_send_queue_full); */ 1046 /* XXX make sure this is reasonable */ 1047 if (payload_len > rdsv3_sk_sndbuf(rs)) { 1048 ret = -EMSGSIZE; 1049 RDSV3_DPRINTF2("rdsv3_sendmsg", 1050 "msgsize(%d) too big, returning: %d", 1051 payload_len, -ret); 1052 goto out; 1053 } 1054 if (nonblock) { 1055 ret = -EAGAIN; 1056 RDSV3_DPRINTF3("rdsv3_sendmsg", 1057 "send queue full (%d), returning: %d", 1058 payload_len, -ret); 1059 goto out; 1060 } 1061 1062 #if 0 1063 ret = rdsv3_wait_sig(sk->sk_sleep, 1064 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1065 dport, &queued))); 1066 if (ret == 0) { 1067 /* signal/timeout pending */ 1068 RDSV3_DPRINTF2("rdsv3_sendmsg", 1069 "woke due to signal: %d", ret); 1070 ret = -ERESTART; 1071 goto out; 1072 } 1073 #else 1074 mutex_enter(&sk->sk_sleep->waitq_mutex); 1075 sk->sk_sleep->waitq_waiters++; 1076 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1077 dport, &queued)) { 1078 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 1079 &sk->sk_sleep->waitq_mutex); 1080 if (ret == 0) { 1081 /* signal/timeout pending */ 1082 RDSV3_DPRINTF2("rdsv3_sendmsg", 1083 "woke due to signal: %d", ret); 1084 ret = -ERESTART; 1085 sk->sk_sleep->waitq_waiters--; 1086 mutex_exit(&sk->sk_sleep->waitq_mutex); 1087 goto out; 1088 } 1089 } 1090 sk->sk_sleep->waitq_waiters--; 1091 mutex_exit(&sk->sk_sleep->waitq_mutex); 1092 #endif 1093 1094 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d", 1095 queued); 1096 1097 ASSERT(queued); 1098 ret = 0; 1099 } 1100 1101 /* 1102 * By now we've committed to the send. We reuse rdsv3_send_worker() 1103 * to retry sends in the rds thread if the transport asks us to. 1104 */ 1105 rdsv3_stats_inc(s_send_queued); 1106 1107 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1108 rdsv3_send_worker(&conn->c_send_w.work); 1109 1110 rdsv3_message_put(rm); 1111 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)", 1112 rs, payload_len); 1113 return (payload_len); 1114 1115 out: 1116 /* 1117 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1118 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1119 * or in any other way, we need to destroy the MR again 1120 */ 1121 if (allocated_mr) 1122 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 1123 1); 1124 1125 if (rm) 1126 rdsv3_message_put(rm); 1127 return (ret); 1128 } 1129 1130 /* 1131 * Reply to a ping packet. 1132 */ 1133 int 1134 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport) 1135 { 1136 struct rdsv3_message *rm; 1137 int ret = 0; 1138 1139 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn); 1140 1141 rm = rdsv3_message_alloc(0, KM_NOSLEEP); 1142 if (rm == NULL) { 1143 ret = -ENOMEM; 1144 goto out; 1145 } 1146 1147 rm->m_daddr = conn->c_faddr; 1148 1149 /* 1150 * If the connection is down, trigger a connect. We may 1151 * have scheduled a delayed reconnect however - in this case 1152 * we should not interfere. 1153 */ 1154 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1155 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1156 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1157 1158 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL); 1159 if (ret) 1160 goto out; 1161 1162 mutex_enter(&conn->c_lock); 1163 list_insert_tail(&conn->c_send_queue, rm); 1164 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 1165 rdsv3_message_addref(rm); 1166 rm->m_inc.i_conn = conn; 1167 1168 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1169 conn->c_next_tx_seq); 1170 conn->c_next_tx_seq++; 1171 mutex_exit(&conn->c_lock); 1172 1173 rdsv3_stats_inc(s_send_queued); 1174 rdsv3_stats_inc(s_send_pong); 1175 1176 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0); 1177 rdsv3_message_put(rm); 1178 1179 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn); 1180 return (0); 1181 1182 out: 1183 if (rm) 1184 rdsv3_message_put(rm); 1185 return (ret); 1186 } 1187