1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2006 Oracle. All rights reserved. 27 * 28 * This software is available to you under a choice of one of two 29 * licenses. You may choose to be licensed under the terms of the GNU 30 * General Public License (GPL) Version 2, available from the file 31 * COPYING in the main directory of this source tree, or the 32 * OpenIB.org BSD license below: 33 * 34 * Redistribution and use in source and binary forms, with or 35 * without modification, are permitted provided that the following 36 * conditions are met: 37 * 38 * - Redistributions of source code must retain the above 39 * copyright notice, this list of conditions and the following 40 * disclaimer. 41 * 42 * - Redistributions in binary form must reproduce the above 43 * copyright notice, this list of conditions and the following 44 * disclaimer in the documentation and/or other materials 45 * provided with the distribution. 46 * 47 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 48 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 49 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 50 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 51 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 52 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 53 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 54 * SOFTWARE. 55 * 56 */ 57 #include <sys/stropts.h> 58 #include <sys/systm.h> 59 60 #include <sys/rds.h> 61 #include <sys/socket.h> 62 #include <sys/socketvar.h> 63 64 #include <sys/ib/clients/rdsv3/rdsv3.h> 65 #include <sys/ib/clients/rdsv3/rdma.h> 66 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 67 68 /* 69 * When transmitting messages in rdsv3_send_xmit, we need to emerge from 70 * time to time and briefly release the CPU. Otherwise the softlock watchdog 71 * will kick our shin. 72 * Also, it seems fairer to not let one busy connection stall all the 73 * others. 74 * 75 * send_batch_count is the number of times we'll loop in send_xmit. Setting 76 * it to 0 will restore the old behavior (where we looped until we had 77 * drained the queue). 78 */ 79 static int send_batch_count = 64; 80 81 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op); 82 /* 83 * Reset the send state. Caller must hold c_send_lock when calling here. 84 */ 85 void 86 rdsv3_send_reset(struct rdsv3_connection *conn) 87 { 88 struct rdsv3_message *rm, *tmp; 89 struct rdsv3_rdma_op *ro; 90 91 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn); 92 93 if (conn->c_xmit_rm) { 94 rm = conn->c_xmit_rm; 95 ro = rm->m_rdma_op; 96 if (ro && ro->r_mapped) { 97 RDSV3_DPRINTF2("rdsv3_send_reset", 98 "rm %p mflg 0x%x map %d mihdl %p sgl %p", 99 rm, rm->m_flags, ro->r_mapped, 100 ro->r_rdma_sg[0].mihdl, 101 ro->r_rdma_sg[0].swr.wr_sgl); 102 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro); 103 } 104 /* 105 * Tell the user the RDMA op is no longer mapped by the 106 * transport. This isn't entirely true (it's flushed out 107 * independently) but as the connection is down, there's 108 * no ongoing RDMA to/from that memory 109 */ 110 rdsv3_message_unmapped(conn->c_xmit_rm); 111 rdsv3_message_put(conn->c_xmit_rm); 112 conn->c_xmit_rm = NULL; 113 } 114 conn->c_xmit_sg = 0; 115 conn->c_xmit_hdr_off = 0; 116 conn->c_xmit_data_off = 0; 117 conn->c_xmit_rdma_sent = 0; 118 119 conn->c_map_queued = 0; 120 121 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets; 122 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes; 123 124 /* Mark messages as retransmissions, and move them to the send q */ 125 mutex_enter(&conn->c_lock); 126 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 127 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 128 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags); 129 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) { 130 RDSV3_DPRINTF4("_send_reset", 131 "RT rm %p mflg 0x%x sgl %p", 132 rm, rm->m_flags, 133 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl); 134 } 135 } 136 list_move_tail(&conn->c_send_queue, &conn->c_retrans); 137 mutex_exit(&conn->c_lock); 138 139 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn); 140 } 141 142 /* 143 * We're making the concious trade-off here to only send one message 144 * down the connection at a time. 145 * Pro: 146 * - tx queueing is a simple fifo list 147 * - reassembly is optional and easily done by transports per conn 148 * - no per flow rx lookup at all, straight to the socket 149 * - less per-frag memory and wire overhead 150 * Con: 151 * - queued acks can be delayed behind large messages 152 * Depends: 153 * - small message latency is higher behind queued large messages 154 * - large message latency isn't starved by intervening small sends 155 */ 156 int 157 rdsv3_send_xmit(struct rdsv3_connection *conn) 158 { 159 struct rdsv3_message *rm; 160 unsigned int tmp; 161 unsigned int send_quota = send_batch_count; 162 struct rdsv3_scatterlist *sg; 163 int ret = 0; 164 int was_empty = 0; 165 list_t to_be_dropped; 166 167 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn); 168 169 list_create(&to_be_dropped, sizeof (struct rdsv3_message), 170 offsetof(struct rdsv3_message, m_conn_item)); 171 172 /* 173 * sendmsg calls here after having queued its message on the send 174 * queue. We only have one task feeding the connection at a time. If 175 * another thread is already feeding the queue then we back off. This 176 * avoids blocking the caller and trading per-connection data between 177 * caches per message. 178 * 179 * The sem holder will issue a retry if they notice that someone queued 180 * a message after they stopped walking the send queue but before they 181 * dropped the sem. 182 */ 183 if (!mutex_tryenter(&conn->c_send_lock)) { 184 RDSV3_DPRINTF4("rdsv3_send_xmit", 185 "Another thread running(conn: %p)", conn); 186 rdsv3_stats_inc(s_send_sem_contention); 187 ret = -ENOMEM; 188 goto out; 189 } 190 191 if (conn->c_trans->xmit_prepare) 192 conn->c_trans->xmit_prepare(conn); 193 194 /* 195 * spin trying to push headers and data down the connection until 196 * the connection doens't make forward progress. 197 */ 198 while (--send_quota) { 199 /* 200 * See if need to send a congestion map update if we're 201 * between sending messages. The send_sem protects our sole 202 * use of c_map_offset and _bytes. 203 * Note this is used only by transports that define a special 204 * xmit_cong_map function. For all others, we create allocate 205 * a cong_map message and treat it just like any other send. 206 */ 207 if (conn->c_map_bytes) { 208 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 209 conn->c_map_offset); 210 if (ret <= 0) 211 break; 212 213 conn->c_map_offset += ret; 214 conn->c_map_bytes -= ret; 215 if (conn->c_map_bytes) 216 continue; 217 } 218 219 /* 220 * If we're done sending the current message, clear the 221 * offset and S/G temporaries. 222 */ 223 rm = conn->c_xmit_rm; 224 if (rm != NULL && 225 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) && 226 conn->c_xmit_sg == rm->m_nents) { 227 conn->c_xmit_rm = NULL; 228 conn->c_xmit_sg = 0; 229 conn->c_xmit_hdr_off = 0; 230 conn->c_xmit_data_off = 0; 231 conn->c_xmit_rdma_sent = 0; 232 233 /* Release the reference to the previous message. */ 234 rdsv3_message_put(rm); 235 rm = NULL; 236 } 237 238 /* If we're asked to send a cong map update, do so. */ 239 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 240 if (conn->c_trans->xmit_cong_map != NULL) { 241 conn->c_map_offset = 0; 242 conn->c_map_bytes = 243 sizeof (struct rdsv3_header) + 244 RDSV3_CONG_MAP_BYTES; 245 continue; 246 } 247 248 rm = rdsv3_cong_update_alloc(conn); 249 if (IS_ERR(rm)) { 250 ret = PTR_ERR(rm); 251 break; 252 } 253 254 conn->c_xmit_rm = rm; 255 } 256 257 /* 258 * Grab the next message from the send queue, if there is one. 259 * 260 * c_xmit_rm holds a ref while we're sending this message down 261 * the connction. We can use this ref while holding the 262 * send_sem.. rdsv3_send_reset() is serialized with it. 263 */ 264 if (rm == NULL) { 265 unsigned int len; 266 267 mutex_enter(&conn->c_lock); 268 269 if (!list_is_empty(&conn->c_send_queue)) { 270 rm = list_remove_head(&conn->c_send_queue); 271 rdsv3_message_addref(rm); 272 273 /* 274 * Move the message from the send queue to 275 * the retransmit 276 * list right away. 277 */ 278 list_insert_tail(&conn->c_retrans, rm); 279 } 280 281 mutex_exit(&conn->c_lock); 282 283 if (rm == NULL) { 284 was_empty = 1; 285 break; 286 } 287 288 /* 289 * Unfortunately, the way Infiniband deals with 290 * RDMA to a bad MR key is by moving the entire 291 * queue pair to error state. We cold possibly 292 * recover from that, but right now we drop the 293 * connection. 294 * Therefore, we never retransmit messages with 295 * RDMA ops. 296 */ 297 if (rm->m_rdma_op && 298 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) { 299 mutex_enter(&conn->c_lock); 300 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, 301 &rm->m_flags)) 302 list_remove_node(&rm->m_conn_item); 303 list_insert_tail(&to_be_dropped, rm); 304 mutex_exit(&conn->c_lock); 305 rdsv3_message_put(rm); 306 continue; 307 } 308 309 /* Require an ACK every once in a while */ 310 len = ntohl(rm->m_inc.i_hdr.h_len); 311 if (conn->c_unacked_packets == 0 || 312 conn->c_unacked_bytes < len) { 313 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 314 315 conn->c_unacked_packets = 316 rdsv3_sysctl_max_unacked_packets; 317 conn->c_unacked_bytes = 318 rdsv3_sysctl_max_unacked_bytes; 319 rdsv3_stats_inc(s_send_ack_required); 320 } else { 321 conn->c_unacked_bytes -= len; 322 conn->c_unacked_packets--; 323 } 324 325 conn->c_xmit_rm = rm; 326 } 327 328 /* 329 * Try and send an rdma message. Let's see if we can 330 * keep this simple and require that the transport either 331 * send the whole rdma or none of it. 332 */ 333 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { 334 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); 335 if (ret) 336 break; 337 conn->c_xmit_rdma_sent = 1; 338 /* 339 * The transport owns the mapped memory for now. 340 * You can't unmap it while it's on the send queue 341 */ 342 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags); 343 } 344 345 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) || 346 conn->c_xmit_sg < rm->m_nents) { 347 ret = conn->c_trans->xmit(conn, rm, 348 conn->c_xmit_hdr_off, 349 conn->c_xmit_sg, 350 conn->c_xmit_data_off); 351 if (ret <= 0) 352 break; 353 354 if (conn->c_xmit_hdr_off < 355 sizeof (struct rdsv3_header)) { 356 tmp = min(ret, 357 sizeof (struct rdsv3_header) - 358 conn->c_xmit_hdr_off); 359 conn->c_xmit_hdr_off += tmp; 360 ret -= tmp; 361 } 362 363 sg = &rm->m_sg[conn->c_xmit_sg]; 364 while (ret) { 365 tmp = min(ret, rdsv3_sg_len(sg) - 366 conn->c_xmit_data_off); 367 conn->c_xmit_data_off += tmp; 368 ret -= tmp; 369 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) { 370 conn->c_xmit_data_off = 0; 371 sg++; 372 conn->c_xmit_sg++; 373 ASSERT(!(ret != 0 && 374 conn->c_xmit_sg == rm->m_nents)); 375 } 376 } 377 } 378 } 379 380 /* Nuke any messages we decided not to retransmit. */ 381 if (!list_is_empty(&to_be_dropped)) 382 rdsv3_send_remove_from_sock(&to_be_dropped, RDSV3_RDMA_DROPPED); 383 384 if (conn->c_trans->xmit_complete) 385 conn->c_trans->xmit_complete(conn); 386 387 /* 388 * We might be racing with another sender who queued a message but 389 * backed off on noticing that we held the c_send_lock. If we check 390 * for queued messages after dropping the sem then either we'll 391 * see the queued message or the queuer will get the sem. If we 392 * notice the queued message then we trigger an immediate retry. 393 * 394 * We need to be careful only to do this when we stopped processing 395 * the send queue because it was empty. It's the only way we 396 * stop processing the loop when the transport hasn't taken 397 * responsibility for forward progress. 398 */ 399 mutex_exit(&conn->c_send_lock); 400 401 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 402 /* 403 * We exhausted the send quota, but there's work left to 404 * do. Return and (re-)schedule the send worker. 405 */ 406 ret = -EAGAIN; 407 } 408 409 if (ret == 0 && was_empty) { 410 /* 411 * A simple bit test would be way faster than taking the 412 * spin lock 413 */ 414 mutex_enter(&conn->c_lock); 415 if (!list_is_empty(&conn->c_send_queue)) { 416 rdsv3_stats_inc(s_send_sem_queue_raced); 417 ret = -EAGAIN; 418 } 419 mutex_exit(&conn->c_lock); 420 } 421 422 out: 423 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)", 424 conn, ret); 425 return (ret); 426 } 427 428 static void 429 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm) 430 { 431 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len); 432 433 ASSERT(mutex_owned(&rs->rs_lock)); 434 435 ASSERT(rs->rs_snd_bytes >= len); 436 rs->rs_snd_bytes -= len; 437 438 if (rs->rs_snd_bytes == 0) 439 rdsv3_stats_inc(s_send_queue_empty); 440 } 441 442 static inline int 443 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack, 444 is_acked_func is_acked) 445 { 446 if (is_acked) 447 return (is_acked(rm, ack)); 448 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack); 449 } 450 451 /* 452 * Returns true if there are no messages on the send and retransmit queues 453 * which have a sequence number greater than or equal to the given sequence 454 * number. 455 */ 456 int 457 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq) 458 { 459 struct rdsv3_message *rm; 460 int ret = 1; 461 462 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn); 463 464 mutex_enter(&conn->c_lock); 465 466 /* XXX - original code spits out warning */ 467 rm = list_head(&conn->c_retrans); 468 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 469 ret = 0; 470 471 /* XXX - original code spits out warning */ 472 rm = list_head(&conn->c_send_queue); 473 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq) 474 ret = 0; 475 476 mutex_exit(&conn->c_lock); 477 478 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn); 479 480 return (ret); 481 } 482 483 /* 484 * This is pretty similar to what happens below in the ACK 485 * handling code - except that we call here as soon as we get 486 * the IB send completion on the RDMA op and the accompanying 487 * message. 488 */ 489 void 490 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status) 491 { 492 struct rdsv3_sock *rs = NULL; 493 struct rdsv3_rdma_op *ro; 494 struct rdsv3_notifier *notifier; 495 496 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm); 497 498 mutex_enter(&rm->m_rs_lock); 499 500 ro = rm->m_rdma_op; 501 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) && 502 ro && ro->r_notify && 503 (notifier = ro->r_notifier) != NULL) { 504 ro->r_notifier = NULL; 505 rs = rm->m_rs; 506 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 507 508 notifier->n_status = status; 509 mutex_enter(&rs->rs_lock); 510 list_insert_tail(&rs->rs_notify_queue, notifier); 511 mutex_exit(&rs->rs_lock); 512 } 513 514 mutex_exit(&rm->m_rs_lock); 515 516 if (rs) { 517 rdsv3_wake_sk_sleep(rs); 518 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 519 } 520 521 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm); 522 } 523 524 /* 525 * This is the same as rdsv3_rdma_send_complete except we 526 * don't do any locking - we have all the ingredients (message, 527 * socket, socket lock) and can just move the notifier. 528 */ 529 static inline void 530 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm, 531 int status) 532 { 533 struct rdsv3_rdma_op *ro; 534 void *ic; 535 536 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete", 537 "Enter(rs: %p, rm: %p)", rs, rm); 538 539 ro = rm->m_rdma_op; 540 if (ro && ro->r_notify && ro->r_notifier) { 541 ro->r_notifier->n_status = status; 542 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier); 543 ro->r_notifier = NULL; 544 } 545 546 /* No need to wake the app - caller does this */ 547 } 548 549 /* 550 * This is called from the IB send completion when we detect 551 * a RDMA operation that failed with remote access error. 552 * So speed is not an issue here. 553 */ 554 struct rdsv3_message * 555 rdsv3_send_get_message(struct rdsv3_connection *conn, 556 struct rdsv3_rdma_op *op) 557 { 558 struct rdsv3_message *rm, *tmp, *found = NULL; 559 560 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn); 561 562 mutex_enter(&conn->c_lock); 563 564 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 565 if (rm->m_rdma_op == op) { 566 atomic_add_32(&rm->m_refcount, 1); 567 found = rm; 568 goto out; 569 } 570 } 571 572 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue, 573 m_conn_item) { 574 if (rm->m_rdma_op == op) { 575 atomic_add_32(&rm->m_refcount, 1); 576 found = rm; 577 break; 578 } 579 } 580 581 out: 582 mutex_exit(&conn->c_lock); 583 584 return (found); 585 } 586 587 /* 588 * This removes messages from the socket's list if they're on it. The list 589 * argument must be private to the caller, we must be able to modify it 590 * without locks. The messages must have a reference held for their 591 * position on the list. This function will drop that reference after 592 * removing the messages from the 'messages' list regardless of if it found 593 * the messages on the socket list or not. 594 */ 595 void 596 rdsv3_send_remove_from_sock(struct list *messages, int status) 597 { 598 struct rdsv3_sock *rs = NULL; 599 struct rdsv3_message *rm; 600 601 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter"); 602 603 while (!list_is_empty(messages)) { 604 rm = list_remove_head(messages); 605 606 /* 607 * If we see this flag cleared then we're *sure* that someone 608 * else beat us to removing it from the sock. If we race 609 * with their flag update we'll get the lock and then really 610 * see that the flag has been cleared. 611 * 612 * The message spinlock makes sure nobody clears rm->m_rs 613 * while we're messing with it. It does not prevent the 614 * message from being removed from the socket, though. 615 */ 616 mutex_enter(&rm->m_rs_lock); 617 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) 618 goto unlock_and_drop; 619 620 if (rs != rm->m_rs) { 621 if (rs) { 622 rdsv3_wake_sk_sleep(rs); 623 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 624 } 625 rs = rm->m_rs; 626 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs)); 627 } 628 629 mutex_enter(&rs->rs_lock); 630 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) { 631 struct rdsv3_rdma_op *ro = rm->m_rdma_op; 632 struct rdsv3_notifier *notifier; 633 634 list_remove_node(&rm->m_sock_item); 635 rdsv3_send_sndbuf_remove(rs, rm); 636 637 if (ro && 638 (notifier = ro->r_notifier) != NULL && 639 (status || ro->r_notify)) { 640 list_insert_tail(&rs->rs_notify_queue, 641 notifier); 642 if (!notifier->n_status) 643 notifier->n_status = status; 644 rm->m_rdma_op->r_notifier = NULL; 645 } 646 rdsv3_message_put(rm); 647 rm->m_rs = NULL; 648 } 649 mutex_exit(&rs->rs_lock); 650 651 unlock_and_drop: 652 mutex_exit(&rm->m_rs_lock); 653 rdsv3_message_put(rm); 654 } 655 656 if (rs) { 657 rdsv3_wake_sk_sleep(rs); 658 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs)); 659 } 660 661 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return"); 662 } 663 664 /* 665 * Transports call here when they've determined that the receiver queued 666 * messages up to, and including, the given sequence number. Messages are 667 * moved to the retrans queue when rdsv3_send_xmit picks them off the send 668 * queue. This means that in the TCP case, the message may not have been 669 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 670 * checks the RDSV3_MSG_HAS_ACK_SEQ bit. 671 * 672 * XXX It's not clear to me how this is safely serialized with socket 673 * destruction. Maybe it should bail if it sees SOCK_DEAD. 674 */ 675 void 676 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack, 677 is_acked_func is_acked) 678 { 679 struct rdsv3_message *rm, *tmp; 680 list_t list; 681 682 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn); 683 684 list_create(&list, sizeof (struct rdsv3_message), 685 offsetof(struct rdsv3_message, m_conn_item)); 686 687 mutex_enter(&conn->c_lock); 688 689 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) { 690 if (!rdsv3_send_is_acked(rm, ack, is_acked)) 691 break; 692 693 list_remove_node(&rm->m_conn_item); 694 list_insert_tail(&list, rm); 695 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 696 } 697 698 #if 0 699 XXX 700 /* order flag updates with spin locks */ 701 if (!list_is_empty(&list)) 702 smp_mb__after_clear_bit(); 703 #endif 704 705 mutex_exit(&conn->c_lock); 706 707 /* now remove the messages from the sock list as needed */ 708 rdsv3_send_remove_from_sock(&list, RDSV3_RDMA_SUCCESS); 709 710 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn); 711 } 712 713 void 714 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest) 715 { 716 struct rdsv3_message *rm, *tmp; 717 struct rdsv3_connection *conn; 718 list_t list; 719 int wake = 0; 720 721 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs); 722 723 list_create(&list, sizeof (struct rdsv3_message), 724 offsetof(struct rdsv3_message, m_sock_item)); 725 726 /* get all the messages we're dropping under the rs lock */ 727 mutex_enter(&rs->rs_lock); 728 729 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue, 730 m_sock_item) { 731 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 732 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 733 continue; 734 735 wake = 1; 736 list_remove(&rs->rs_send_queue, rm); 737 list_insert_tail(&list, rm); 738 rdsv3_send_sndbuf_remove(rs, rm); 739 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 740 } 741 742 mutex_exit(&rs->rs_lock); 743 744 conn = NULL; 745 746 /* now remove the messages from the conn list as needed */ 747 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) { 748 /* 749 * We do this here rather than in the loop above, so that 750 * we don't have to nest m_rs_lock under rs->rs_lock 751 */ 752 mutex_enter(&rm->m_rs_lock); 753 /* If this is a RDMA operation, notify the app. */ 754 __rdsv3_rdma_send_complete(rs, rm, RDSV3_RDMA_CANCELED); 755 rm->m_rs = NULL; 756 mutex_exit(&rm->m_rs_lock); 757 758 /* 759 * If we see this flag cleared then we're *sure* that someone 760 * else beat us to removing it from the conn. If we race 761 * with their flag update we'll get the lock and then really 762 * see that the flag has been cleared. 763 */ 764 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) 765 continue; 766 767 if (conn != rm->m_inc.i_conn) { 768 if (conn) 769 mutex_exit(&conn->c_lock); 770 conn = rm->m_inc.i_conn; 771 mutex_enter(&conn->c_lock); 772 } 773 774 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) { 775 list_remove_node(&rm->m_conn_item); 776 rdsv3_message_put(rm); 777 } 778 } 779 780 if (conn) 781 mutex_exit(&conn->c_lock); 782 783 if (wake) 784 rdsv3_wake_sk_sleep(rs); 785 786 while (!list_is_empty(&list)) { 787 rm = list_remove_head(&list); 788 789 rdsv3_message_wait(rm); 790 rdsv3_message_put(rm); 791 } 792 793 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs); 794 } 795 796 /* 797 * we only want this to fire once so we use the callers 'queued'. It's 798 * possible that another thread can race with us and remove the 799 * message from the flow with RDSV3_CANCEL_SENT_TO. 800 */ 801 static int 802 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn, 803 struct rdsv3_message *rm, uint16_be_t sport, 804 uint16_be_t dport, int *queued) 805 { 806 uint32_t len; 807 808 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm); 809 810 if (*queued) 811 goto out; 812 813 len = ntohl(rm->m_inc.i_hdr.h_len); 814 815 /* 816 * this is the only place which holds both the socket's rs_lock 817 * and the connection's c_lock 818 */ 819 mutex_enter(&rs->rs_lock); 820 821 /* 822 * If there is a little space in sndbuf, we don't queue anything, 823 * and userspace gets -EAGAIN. But poll() indicates there's send 824 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 825 * freed up by incoming acks. So we check the *old* value of 826 * rs_snd_bytes here to allow the last msg to exceed the buffer, 827 * and poll() now knows no more data can be sent. 828 */ 829 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) { 830 rs->rs_snd_bytes += len; 831 832 /* 833 * let recv side know we are close to send space exhaustion. 834 * This is probably not the optimal way to do it, as this 835 * means we set the flag on *all* messages as soon as our 836 * throughput hits a certain threshold. 837 */ 838 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2) 839 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags); 840 841 list_insert_tail(&rs->rs_send_queue, rm); 842 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags); 843 844 rdsv3_message_addref(rm); 845 rm->m_rs = rs; 846 847 /* 848 * The code ordering is a little weird, but we're 849 * trying to minimize the time we hold c_lock 850 */ 851 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport, 852 dport, 0); 853 rm->m_inc.i_conn = conn; 854 rdsv3_message_addref(rm); /* XXX - called twice */ 855 856 mutex_enter(&conn->c_lock); 857 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++); 858 list_insert_tail(&conn->c_send_queue, rm); 859 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 860 mutex_exit(&conn->c_lock); 861 862 RDSV3_DPRINTF5("rdsv3_send_queue_rm", 863 "queued msg %p len %d, rs %p bytes %d seq %llu", 864 rm, len, rs, rs->rs_snd_bytes, 865 (unsigned long long)ntohll( 866 rm->m_inc.i_hdr.h_sequence)); 867 868 *queued = 1; 869 } 870 871 mutex_exit(&rs->rs_lock); 872 873 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs); 874 out: 875 return (*queued); 876 } 877 878 static int 879 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm, 880 struct msghdr *msg, int *allocated_mr) 881 { 882 struct cmsghdr *cmsg; 883 int ret = 0; 884 885 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs); 886 887 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 888 889 if (cmsg->cmsg_level != SOL_RDS) 890 continue; 891 892 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d", 893 cmsg, rm, cmsg->cmsg_type); 894 /* 895 * As a side effect, RDMA_DEST and RDMA_MAP will set 896 * rm->m_rdma_cookie and rm->m_rdma_mr. 897 */ 898 switch (cmsg->cmsg_type) { 899 case RDSV3_CMSG_RDMA_ARGS: 900 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg); 901 break; 902 903 case RDSV3_CMSG_RDMA_DEST: 904 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg); 905 break; 906 907 case RDSV3_CMSG_RDMA_MAP: 908 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg); 909 if (ret) 910 *allocated_mr = 1; 911 break; 912 913 default: 914 return (-EINVAL); 915 } 916 917 if (ret) 918 break; 919 } 920 921 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs); 922 923 return (ret); 924 } 925 926 int 927 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg, 928 size_t payload_len) 929 { 930 struct rsock *sk = rdsv3_rs_to_sk(rs); 931 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 932 uint32_be_t daddr; 933 uint16_be_t dport; 934 struct rdsv3_message *rm = NULL; 935 struct rdsv3_connection *conn; 936 int ret = 0; 937 int queued = 0, allocated_mr = 0; 938 int nonblock = msg->msg_flags & MSG_DONTWAIT; 939 long timeo = rdsv3_rcvtimeo(sk, nonblock); 940 941 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs); 942 943 if (msg->msg_namelen) { 944 /* XXX fail non-unicast destination IPs? */ 945 if (msg->msg_namelen < sizeof (*usin) || 946 usin->sin_family != AF_INET_OFFLOAD) { 947 ret = -EINVAL; 948 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 949 goto out; 950 } 951 daddr = usin->sin_addr.s_addr; 952 dport = usin->sin_port; 953 } else { 954 /* We only care about consistency with ->connect() */ 955 mutex_enter(&sk->sk_lock); 956 daddr = rs->rs_conn_addr; 957 dport = rs->rs_conn_port; 958 mutex_exit(&sk->sk_lock); 959 } 960 961 /* racing with another thread binding seems ok here */ 962 if (daddr == 0 || rs->rs_bound_addr == 0) { 963 ret = -ENOTCONN; /* XXX not a great errno */ 964 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret); 965 goto out; 966 } 967 968 rm = rdsv3_message_copy_from_user(uio, payload_len); 969 if (IS_ERR(rm)) { 970 ret = PTR_ERR(rm); 971 RDSV3_DPRINTF2("rdsv3_sendmsg", 972 "rdsv3_message_copy_from_user failed %d", -ret); 973 rm = NULL; 974 goto out; 975 } 976 977 rm->m_daddr = daddr; 978 979 /* 980 * rdsv3_conn_create has a spinlock that runs with IRQ off. 981 * Caching the conn in the socket helps a lot. 982 */ 983 mutex_enter(&rs->rs_conn_lock); 984 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) { 985 conn = rs->rs_conn; 986 } else { 987 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr, 988 daddr, rs->rs_transport, KM_NOSLEEP); 989 if (IS_ERR(conn)) { 990 mutex_exit(&rs->rs_conn_lock); 991 ret = PTR_ERR(conn); 992 RDSV3_DPRINTF2("rdsv3_sendmsg", 993 "rdsv3_conn_create_outgoing failed %d", 994 -ret); 995 goto out; 996 } 997 rs->rs_conn = conn; 998 } 999 mutex_exit(&rs->rs_conn_lock); 1000 1001 /* Parse any control messages the user may have included. */ 1002 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr); 1003 if (ret) { 1004 RDSV3_DPRINTF2("rdsv3_sendmsg", 1005 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d", 1006 rs, rm, msg, ret); 1007 goto out; 1008 } 1009 1010 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1011 conn->c_trans->xmit_rdma == NULL) { 1012 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p", 1013 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1014 ret = -EOPNOTSUPP; 1015 goto out; 1016 } 1017 1018 /* 1019 * If the connection is down, trigger a connect. We may 1020 * have scheduled a delayed reconnect however - in this case 1021 * we should not interfere. 1022 */ 1023 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1024 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1025 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1026 1027 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs); 1028 if (ret) { 1029 RDSV3_DPRINTF2("rdsv3_sendmsg", 1030 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret); 1031 goto out; 1032 } 1033 1034 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, 1035 &queued); 1036 if (!queued) { 1037 /* rdsv3_stats_inc(s_send_queue_full); */ 1038 /* XXX make sure this is reasonable */ 1039 if (payload_len > rdsv3_sk_sndbuf(rs)) { 1040 ret = -EMSGSIZE; 1041 RDSV3_DPRINTF2("rdsv3_sendmsg", 1042 "msgsize(%d) too big, returning: %d", 1043 payload_len, -ret); 1044 goto out; 1045 } 1046 if (nonblock) { 1047 ret = -EAGAIN; 1048 RDSV3_DPRINTF3("rdsv3_sendmsg", 1049 "send queue full (%d), returning: %d", 1050 payload_len, -ret); 1051 goto out; 1052 } 1053 1054 #if 0 1055 ret = rdsv3_wait_sig(sk->sk_sleep, 1056 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1057 dport, &queued))); 1058 if (ret == 0) { 1059 /* signal/timeout pending */ 1060 RDSV3_DPRINTF2("rdsv3_sendmsg", 1061 "woke due to signal: %d", ret); 1062 ret = -ERESTART; 1063 goto out; 1064 } 1065 #else 1066 mutex_enter(&sk->sk_sleep->waitq_mutex); 1067 sk->sk_sleep->waitq_waiters++; 1068 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1069 dport, &queued)) { 1070 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 1071 &sk->sk_sleep->waitq_mutex); 1072 if (ret == 0) { 1073 /* signal/timeout pending */ 1074 RDSV3_DPRINTF2("rdsv3_sendmsg", 1075 "woke due to signal: %d", ret); 1076 ret = -ERESTART; 1077 sk->sk_sleep->waitq_waiters--; 1078 mutex_exit(&sk->sk_sleep->waitq_mutex); 1079 goto out; 1080 } 1081 } 1082 sk->sk_sleep->waitq_waiters--; 1083 mutex_exit(&sk->sk_sleep->waitq_mutex); 1084 #endif 1085 1086 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d", 1087 queued); 1088 1089 ASSERT(queued); 1090 ret = 0; 1091 } 1092 1093 /* 1094 * By now we've committed to the send. We reuse rdsv3_send_worker() 1095 * to retry sends in the rds thread if the transport asks us to. 1096 */ 1097 rdsv3_stats_inc(s_send_queued); 1098 1099 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 1100 rdsv3_send_worker(&conn->c_send_w.work); 1101 1102 rdsv3_message_put(rm); 1103 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)", 1104 rs, payload_len); 1105 return (payload_len); 1106 1107 out: 1108 /* 1109 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1110 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1111 * or in any other way, we need to destroy the MR again 1112 */ 1113 if (allocated_mr) 1114 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 1115 1); 1116 1117 if (rm) 1118 rdsv3_message_put(rm); 1119 return (ret); 1120 } 1121 1122 /* 1123 * Reply to a ping packet. 1124 */ 1125 int 1126 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport) 1127 { 1128 struct rdsv3_message *rm; 1129 int ret = 0; 1130 1131 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn); 1132 1133 rm = rdsv3_message_alloc(0, KM_NOSLEEP); 1134 if (rm == NULL) { 1135 ret = -ENOMEM; 1136 goto out; 1137 } 1138 1139 rm->m_daddr = conn->c_faddr; 1140 1141 /* 1142 * If the connection is down, trigger a connect. We may 1143 * have scheduled a delayed reconnect however - in this case 1144 * we should not interfere. 1145 */ 1146 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN && 1147 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags)) 1148 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0); 1149 1150 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL); 1151 if (ret) 1152 goto out; 1153 1154 mutex_enter(&conn->c_lock); 1155 list_insert_tail(&conn->c_send_queue, rm); 1156 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags); 1157 rdsv3_message_addref(rm); 1158 rm->m_inc.i_conn = conn; 1159 1160 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1161 conn->c_next_tx_seq); 1162 conn->c_next_tx_seq++; 1163 mutex_exit(&conn->c_lock); 1164 1165 rdsv3_stats_inc(s_send_queued); 1166 rdsv3_stats_inc(s_send_pong); 1167 1168 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0); 1169 rdsv3_message_put(rm); 1170 1171 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn); 1172 return (0); 1173 1174 out: 1175 if (rm) 1176 rdsv3_message_put(rm); 1177 return (ret); 1178 } 1179