1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #include <sys/types.h> 27 #include <sys/stream.h> 28 #include <sys/strsun.h> 29 #include <sys/strsubr.h> 30 #include <sys/debug.h> 31 #include <sys/sdt.h> 32 #include <sys/cmn_err.h> 33 #include <sys/tihdr.h> 34 35 #include <inet/common.h> 36 #include <inet/optcom.h> 37 #include <inet/ip.h> 38 #include <inet/ip_impl.h> 39 #include <inet/tcp.h> 40 #include <inet/tcp_impl.h> 41 #include <inet/ipsec_impl.h> 42 #include <inet/ipclassifier.h> 43 #include <inet/ipp_common.h> 44 #include <inet/ip_if.h> 45 46 /* 47 * This file implements TCP fusion - a protocol-less data path for TCP 48 * loopback connections. The fusion of two local TCP endpoints occurs 49 * at connection establishment time. Various conditions (see details 50 * in tcp_fuse()) need to be met for fusion to be successful. If it 51 * fails, we fall back to the regular TCP data path; if it succeeds, 52 * both endpoints proceed to use tcp_fuse_output() as the transmit path. 53 * tcp_fuse_output() enqueues application data directly onto the peer's 54 * receive queue; no protocol processing is involved. After enqueueing 55 * the data, the sender can either push (putnext) data up the receiver's 56 * read queue; or the sender can simply return and let the receiver 57 * retrieve the enqueued data via the synchronous streams entry point 58 * tcp_fuse_rrw(). The latter path is taken if synchronous streams is 59 * enabled (the default). It is disabled if sockfs no longer resides 60 * directly on top of tcp module due to a module insertion or removal. 61 * It also needs to be temporarily disabled when sending urgent data 62 * because the tcp_fuse_rrw() path bypasses the M_PROTO processing done 63 * by strsock_proto() hook. 64 * 65 * Sychronization is handled by squeue and the mutex tcp_non_sq_lock. 66 * One of the requirements for fusion to succeed is that both endpoints 67 * need to be using the same squeue. This ensures that neither side 68 * can disappear while the other side is still sending data. By itself, 69 * squeue is not sufficient for guaranteeing safety when synchronous 70 * streams is enabled. The reason is that tcp_fuse_rrw() doesn't enter 71 * the squeue and its access to tcp_rcv_list and other fusion-related 72 * fields needs to be sychronized with the sender. tcp_non_sq_lock is 73 * used for this purpose. When there is urgent data, the sender needs 74 * to push the data up the receiver's streams read queue. In order to 75 * avoid holding the tcp_non_sq_lock across putnext(), the sender sets 76 * the peer tcp's tcp_fuse_syncstr_plugged bit and releases tcp_non_sq_lock 77 * (see macro TCP_FUSE_SYNCSTR_PLUG_DRAIN()). If tcp_fuse_rrw() enters 78 * after this point, it will see that synchronous streams is plugged and 79 * will wait on tcp_fuse_plugcv. After the sender has finished pushing up 80 * all urgent data, it will clear the tcp_fuse_syncstr_plugged bit using 81 * TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(). This will cause any threads waiting 82 * on tcp_fuse_plugcv to return EBUSY, and in turn cause strget() to call 83 * getq_noenab() to dequeue data from the stream head instead. Once the 84 * data on the stream head has been consumed, tcp_fuse_rrw() may again 85 * be used to process tcp_rcv_list. However, if TCP_FUSE_SYNCSTR_STOP() 86 * has been called, all future calls to tcp_fuse_rrw() will return EBUSY, 87 * effectively disabling synchronous streams. 88 * 89 * The following note applies only to the synchronous streams mode. 90 * 91 * Flow control is done by checking the size of receive buffer and 92 * the number of data blocks, both set to different limits. This is 93 * different than regular streams flow control where cumulative size 94 * check dominates block count check -- streams queue high water mark 95 * typically represents bytes. Each enqueue triggers notifications 96 * to the receiving process; a build up of data blocks indicates a 97 * slow receiver and the sender should be blocked or informed at the 98 * earliest moment instead of further wasting system resources. In 99 * effect, this is equivalent to limiting the number of outstanding 100 * segments in flight. 101 */ 102 103 /* 104 * Setting this to false means we disable fusion altogether and 105 * loopback connections would go through the protocol paths. 106 */ 107 boolean_t do_tcp_fusion = B_TRUE; 108 109 /* 110 * Enabling this flag allows sockfs to retrieve data directly 111 * from a fused tcp endpoint using synchronous streams interface. 112 */ 113 boolean_t do_tcp_direct_sockfs = B_TRUE; 114 115 /* 116 * This is the minimum amount of outstanding writes allowed on 117 * a synchronous streams-enabled receiving endpoint before the 118 * sender gets flow-controlled. Setting this value to 0 means 119 * that the data block limit is equivalent to the byte count 120 * limit, which essentially disables the check. 121 */ 122 #define TCP_FUSION_RCV_UNREAD_MIN 8 123 uint_t tcp_fusion_rcv_unread_min = TCP_FUSION_RCV_UNREAD_MIN; 124 125 static void tcp_fuse_syncstr_enable(tcp_t *); 126 static void tcp_fuse_syncstr_disable(tcp_t *); 127 static boolean_t strrput_sig(queue_t *, boolean_t); 128 129 /* 130 * Return true if this connection needs some IP functionality 131 */ 132 static boolean_t 133 tcp_loopback_needs_ip(tcp_t *tcp, netstack_t *ns) 134 { 135 ipsec_stack_t *ipss = ns->netstack_ipsec; 136 137 /* 138 * If ire is not cached, do not use fusion 139 */ 140 if (tcp->tcp_connp->conn_ire_cache == NULL) { 141 /* 142 * There is no need to hold conn_lock here because when called 143 * from tcp_fuse() there can be no window where conn_ire_cache 144 * can change. This is not true when called from 145 * tcp_fuse_output() as conn_ire_cache can become null just 146 * after the check. It will be necessary to recheck for a NULL 147 * conn_ire_cache in tcp_fuse_output() to avoid passing a 148 * stale ill pointer to FW_HOOKS. 149 */ 150 return (B_TRUE); 151 } 152 if (tcp->tcp_ipversion == IPV4_VERSION) { 153 if (tcp->tcp_ip_hdr_len != IP_SIMPLE_HDR_LENGTH) 154 return (B_TRUE); 155 if (CONN_OUTBOUND_POLICY_PRESENT(tcp->tcp_connp, ipss)) 156 return (B_TRUE); 157 if (CONN_INBOUND_POLICY_PRESENT(tcp->tcp_connp, ipss)) 158 return (B_TRUE); 159 } else { 160 if (tcp->tcp_ip_hdr_len != IPV6_HDR_LEN) 161 return (B_TRUE); 162 if (CONN_OUTBOUND_POLICY_PRESENT_V6(tcp->tcp_connp, ipss)) 163 return (B_TRUE); 164 if (CONN_INBOUND_POLICY_PRESENT_V6(tcp->tcp_connp, ipss)) 165 return (B_TRUE); 166 } 167 if (!CONN_IS_LSO_MD_FASTPATH(tcp->tcp_connp)) 168 return (B_TRUE); 169 return (B_FALSE); 170 } 171 172 173 /* 174 * This routine gets called by the eager tcp upon changing state from 175 * SYN_RCVD to ESTABLISHED. It fuses a direct path between itself 176 * and the active connect tcp such that the regular tcp processings 177 * may be bypassed under allowable circumstances. Because the fusion 178 * requires both endpoints to be in the same squeue, it does not work 179 * for simultaneous active connects because there is no easy way to 180 * switch from one squeue to another once the connection is created. 181 * This is different from the eager tcp case where we assign it the 182 * same squeue as the one given to the active connect tcp during open. 183 */ 184 void 185 tcp_fuse(tcp_t *tcp, uchar_t *iphdr, tcph_t *tcph) 186 { 187 conn_t *peer_connp, *connp = tcp->tcp_connp; 188 tcp_t *peer_tcp; 189 tcp_stack_t *tcps = tcp->tcp_tcps; 190 netstack_t *ns; 191 ip_stack_t *ipst = tcps->tcps_netstack->netstack_ip; 192 193 ASSERT(!tcp->tcp_fused); 194 ASSERT(tcp->tcp_loopback); 195 ASSERT(tcp->tcp_loopback_peer == NULL); 196 /* 197 * We need to inherit q_hiwat of the listener tcp, but we can't 198 * really use tcp_listener since we get here after sending up 199 * T_CONN_IND and tcp_wput_accept() may be called independently, 200 * at which point tcp_listener is cleared; this is why we use 201 * tcp_saved_listener. The listener itself is guaranteed to be 202 * around until tcp_accept_finish() is called on this eager -- 203 * this won't happen until we're done since we're inside the 204 * eager's perimeter now. 205 * 206 * We can also get called in the case were a connection needs 207 * to be re-fused. In this case tcp_saved_listener will be 208 * NULL but tcp_refuse will be true. 209 */ 210 ASSERT(tcp->tcp_saved_listener != NULL || tcp->tcp_refuse); 211 /* 212 * Lookup peer endpoint; search for the remote endpoint having 213 * the reversed address-port quadruplet in ESTABLISHED state, 214 * which is guaranteed to be unique in the system. Zone check 215 * is applied accordingly for loopback address, but not for 216 * local address since we want fusion to happen across Zones. 217 */ 218 if (tcp->tcp_ipversion == IPV4_VERSION) { 219 peer_connp = ipcl_conn_tcp_lookup_reversed_ipv4(connp, 220 (ipha_t *)iphdr, tcph, ipst); 221 } else { 222 peer_connp = ipcl_conn_tcp_lookup_reversed_ipv6(connp, 223 (ip6_t *)iphdr, tcph, ipst); 224 } 225 226 /* 227 * We can only proceed if peer exists, resides in the same squeue 228 * as our conn and is not raw-socket. The squeue assignment of 229 * this eager tcp was done earlier at the time of SYN processing 230 * in ip_fanout_tcp{_v6}. Note that similar squeues by itself 231 * doesn't guarantee a safe condition to fuse, hence we perform 232 * additional tests below. 233 */ 234 ASSERT(peer_connp == NULL || peer_connp != connp); 235 if (peer_connp == NULL || peer_connp->conn_sqp != connp->conn_sqp || 236 !IPCL_IS_TCP(peer_connp)) { 237 if (peer_connp != NULL) { 238 TCP_STAT(tcps, tcp_fusion_unqualified); 239 CONN_DEC_REF(peer_connp); 240 } 241 return; 242 } 243 peer_tcp = peer_connp->conn_tcp; /* active connect tcp */ 244 245 ASSERT(peer_tcp != NULL && peer_tcp != tcp && !peer_tcp->tcp_fused); 246 ASSERT(peer_tcp->tcp_loopback && peer_tcp->tcp_loopback_peer == NULL); 247 ASSERT(peer_connp->conn_sqp == connp->conn_sqp); 248 249 /* 250 * Fuse the endpoints; we perform further checks against both 251 * tcp endpoints to ensure that a fusion is allowed to happen. 252 * In particular we bail out for non-simple TCP/IP or if IPsec/ 253 * IPQoS policy/kernel SSL exists. 254 */ 255 ns = tcps->tcps_netstack; 256 ipst = ns->netstack_ip; 257 258 if (!tcp->tcp_unfusable && !peer_tcp->tcp_unfusable && 259 !tcp_loopback_needs_ip(tcp, ns) && 260 !tcp_loopback_needs_ip(peer_tcp, ns) && 261 tcp->tcp_kssl_ent == NULL && 262 !IPP_ENABLED(IPP_LOCAL_OUT|IPP_LOCAL_IN, ipst)) { 263 mblk_t *mp; 264 queue_t *peer_rq = peer_tcp->tcp_rq; 265 266 ASSERT(!TCP_IS_DETACHED(peer_tcp)); 267 ASSERT(tcp->tcp_fused_sigurg_mp == NULL); 268 ASSERT(peer_tcp->tcp_fused_sigurg_mp == NULL); 269 ASSERT(tcp->tcp_kssl_ctx == NULL); 270 271 /* 272 * We need to drain data on both endpoints during unfuse. 273 * If we need to send up SIGURG at the time of draining, 274 * we want to be sure that an mblk is readily available. 275 * This is why we pre-allocate the M_PCSIG mblks for both 276 * endpoints which will only be used during/after unfuse. 277 */ 278 if (!IPCL_IS_NONSTR(tcp->tcp_connp)) { 279 if ((mp = allocb(1, BPRI_HI)) == NULL) 280 goto failed; 281 282 tcp->tcp_fused_sigurg_mp = mp; 283 } 284 285 if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp)) { 286 if ((mp = allocb(1, BPRI_HI)) == NULL) 287 goto failed; 288 289 peer_tcp->tcp_fused_sigurg_mp = mp; 290 } 291 292 if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp) && 293 (mp = allocb(sizeof (struct stroptions), 294 BPRI_HI)) == NULL) { 295 goto failed; 296 } 297 298 /* If either tcp or peer_tcp sodirect enabled then disable */ 299 if (tcp->tcp_sodirect != NULL) { 300 mutex_enter(tcp->tcp_sodirect->sod_lockp); 301 SOD_DISABLE(tcp->tcp_sodirect); 302 mutex_exit(tcp->tcp_sodirect->sod_lockp); 303 tcp->tcp_sodirect = NULL; 304 } 305 if (peer_tcp->tcp_sodirect != NULL) { 306 mutex_enter(peer_tcp->tcp_sodirect->sod_lockp); 307 SOD_DISABLE(peer_tcp->tcp_sodirect); 308 mutex_exit(peer_tcp->tcp_sodirect->sod_lockp); 309 peer_tcp->tcp_sodirect = NULL; 310 } 311 312 /* Fuse both endpoints */ 313 peer_tcp->tcp_loopback_peer = tcp; 314 tcp->tcp_loopback_peer = peer_tcp; 315 peer_tcp->tcp_fused = tcp->tcp_fused = B_TRUE; 316 317 /* 318 * We never use regular tcp paths in fusion and should 319 * therefore clear tcp_unsent on both endpoints. Having 320 * them set to non-zero values means asking for trouble 321 * especially after unfuse, where we may end up sending 322 * through regular tcp paths which expect xmit_list and 323 * friends to be correctly setup. 324 */ 325 peer_tcp->tcp_unsent = tcp->tcp_unsent = 0; 326 327 tcp_timers_stop(tcp); 328 tcp_timers_stop(peer_tcp); 329 330 /* 331 * At this point we are a detached eager tcp and therefore 332 * don't have a queue assigned to us until accept happens. 333 * In the mean time the peer endpoint may immediately send 334 * us data as soon as fusion is finished, and we need to be 335 * able to flow control it in case it sends down huge amount 336 * of data while we're still detached. To prevent that we 337 * inherit the listener's recv_hiwater value; this is temporary 338 * since we'll repeat the process intcp_accept_finish(). 339 */ 340 if (!tcp->tcp_refuse) { 341 (void) tcp_fuse_set_rcv_hiwat(tcp, 342 tcp->tcp_saved_listener->tcp_recv_hiwater); 343 344 /* 345 * Set the stream head's write offset value to zero 346 * since we won't be needing any room for TCP/IP 347 * headers; tell it to not break up the writes (this 348 * would reduce the amount of work done by kmem); and 349 * configure our receive buffer. Note that we can only 350 * do this for the active connect tcp since our eager is 351 * still detached; it will be dealt with later in 352 * tcp_accept_finish(). 353 */ 354 if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp)) { 355 struct stroptions *stropt; 356 357 DB_TYPE(mp) = M_SETOPTS; 358 mp->b_wptr += sizeof (*stropt); 359 360 stropt = (struct stroptions *)mp->b_rptr; 361 stropt->so_flags = SO_MAXBLK|SO_WROFF|SO_HIWAT; 362 stropt->so_maxblk = tcp_maxpsz_set(peer_tcp, 363 B_FALSE); 364 stropt->so_wroff = 0; 365 366 /* 367 * Record the stream head's high water mark for 368 * peer endpoint; this is used for flow-control 369 * purposes in tcp_fuse_output(). 370 */ 371 stropt->so_hiwat = tcp_fuse_set_rcv_hiwat( 372 peer_tcp, peer_rq->q_hiwat); 373 374 tcp->tcp_refuse = B_FALSE; 375 peer_tcp->tcp_refuse = B_FALSE; 376 /* Send the options up */ 377 putnext(peer_rq, mp); 378 } else { 379 struct sock_proto_props sopp; 380 381 /* The peer is a non-STREAMS end point */ 382 ASSERT(IPCL_IS_TCP(peer_connp)); 383 384 (void) tcp_fuse_set_rcv_hiwat(tcp, 385 tcp->tcp_saved_listener->tcp_recv_hiwater); 386 387 sopp.sopp_flags = SOCKOPT_MAXBLK | 388 SOCKOPT_WROFF | SOCKOPT_RCVHIWAT; 389 sopp.sopp_maxblk = tcp_maxpsz_set(peer_tcp, 390 B_FALSE); 391 sopp.sopp_wroff = 0; 392 sopp.sopp_rxhiwat = tcp_fuse_set_rcv_hiwat( 393 peer_tcp, peer_tcp->tcp_recv_hiwater); 394 (*peer_connp->conn_upcalls->su_set_proto_props) 395 (peer_connp->conn_upper_handle, &sopp); 396 } 397 } 398 tcp->tcp_refuse = B_FALSE; 399 peer_tcp->tcp_refuse = B_FALSE; 400 } else { 401 TCP_STAT(tcps, tcp_fusion_unqualified); 402 } 403 CONN_DEC_REF(peer_connp); 404 return; 405 406 failed: 407 if (tcp->tcp_fused_sigurg_mp != NULL) { 408 freeb(tcp->tcp_fused_sigurg_mp); 409 tcp->tcp_fused_sigurg_mp = NULL; 410 } 411 if (peer_tcp->tcp_fused_sigurg_mp != NULL) { 412 freeb(peer_tcp->tcp_fused_sigurg_mp); 413 peer_tcp->tcp_fused_sigurg_mp = NULL; 414 } 415 CONN_DEC_REF(peer_connp); 416 } 417 418 /* 419 * Unfuse a previously-fused pair of tcp loopback endpoints. 420 */ 421 void 422 tcp_unfuse(tcp_t *tcp) 423 { 424 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 425 426 ASSERT(tcp->tcp_fused && peer_tcp != NULL); 427 ASSERT(peer_tcp->tcp_fused && peer_tcp->tcp_loopback_peer == tcp); 428 ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp); 429 ASSERT(tcp->tcp_unsent == 0 && peer_tcp->tcp_unsent == 0); 430 431 /* 432 * We disable synchronous streams, drain any queued data and 433 * clear tcp_direct_sockfs. The synchronous streams entry 434 * points will become no-ops after this point. 435 */ 436 tcp_fuse_disable_pair(tcp, B_TRUE); 437 438 /* 439 * Update th_seq and th_ack in the header template 440 */ 441 U32_TO_ABE32(tcp->tcp_snxt, tcp->tcp_tcph->th_seq); 442 U32_TO_ABE32(tcp->tcp_rnxt, tcp->tcp_tcph->th_ack); 443 U32_TO_ABE32(peer_tcp->tcp_snxt, peer_tcp->tcp_tcph->th_seq); 444 U32_TO_ABE32(peer_tcp->tcp_rnxt, peer_tcp->tcp_tcph->th_ack); 445 446 /* Unfuse the endpoints */ 447 peer_tcp->tcp_fused = tcp->tcp_fused = B_FALSE; 448 peer_tcp->tcp_loopback_peer = tcp->tcp_loopback_peer = NULL; 449 if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp)) { 450 ASSERT(peer_tcp->tcp_fused_sigurg_mp != NULL); 451 freeb(peer_tcp->tcp_fused_sigurg_mp); 452 peer_tcp->tcp_fused_sigurg_mp = NULL; 453 } 454 if (!IPCL_IS_NONSTR(tcp->tcp_connp)) { 455 ASSERT(tcp->tcp_fused_sigurg_mp != NULL); 456 freeb(tcp->tcp_fused_sigurg_mp); 457 tcp->tcp_fused_sigurg_mp = NULL; 458 } 459 } 460 461 /* 462 * Fusion output routine for urgent data. This routine is called by 463 * tcp_fuse_output() for handling non-M_DATA mblks. 464 */ 465 void 466 tcp_fuse_output_urg(tcp_t *tcp, mblk_t *mp) 467 { 468 mblk_t *mp1; 469 struct T_exdata_ind *tei; 470 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 471 mblk_t *head, *prev_head = NULL; 472 tcp_stack_t *tcps = tcp->tcp_tcps; 473 474 ASSERT(tcp->tcp_fused); 475 ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp); 476 ASSERT(DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO); 477 ASSERT(mp->b_cont != NULL && DB_TYPE(mp->b_cont) == M_DATA); 478 ASSERT(MBLKL(mp) >= sizeof (*tei) && MBLKL(mp->b_cont) > 0); 479 480 /* 481 * Urgent data arrives in the form of T_EXDATA_REQ from above. 482 * Each occurence denotes a new urgent pointer. For each new 483 * urgent pointer we signal (SIGURG) the receiving app to indicate 484 * that it needs to go into urgent mode. This is similar to the 485 * urgent data handling in the regular tcp. We don't need to keep 486 * track of where the urgent pointer is, because each T_EXDATA_REQ 487 * "advances" the urgent pointer for us. 488 * 489 * The actual urgent data carried by T_EXDATA_REQ is then prepended 490 * by a T_EXDATA_IND before being enqueued behind any existing data 491 * destined for the receiving app. There is only a single urgent 492 * pointer (out-of-band mark) for a given tcp. If the new urgent 493 * data arrives before the receiving app reads some existing urgent 494 * data, the previous marker is lost. This behavior is emulated 495 * accordingly below, by removing any existing T_EXDATA_IND messages 496 * and essentially converting old urgent data into non-urgent. 497 */ 498 ASSERT(tcp->tcp_valid_bits & TCP_URG_VALID); 499 /* Let sender get out of urgent mode */ 500 tcp->tcp_valid_bits &= ~TCP_URG_VALID; 501 502 /* 503 * This flag indicates that a signal needs to be sent up. 504 * This flag will only get cleared once SIGURG is delivered and 505 * is not affected by the tcp_fused flag -- delivery will still 506 * happen even after an endpoint is unfused, to handle the case 507 * where the sending endpoint immediately closes/unfuses after 508 * sending urgent data and the accept is not yet finished. 509 */ 510 peer_tcp->tcp_fused_sigurg = B_TRUE; 511 512 /* Reuse T_EXDATA_REQ mblk for T_EXDATA_IND */ 513 DB_TYPE(mp) = M_PROTO; 514 tei = (struct T_exdata_ind *)mp->b_rptr; 515 tei->PRIM_type = T_EXDATA_IND; 516 tei->MORE_flag = 0; 517 mp->b_wptr = (uchar_t *)&tei[1]; 518 519 TCP_STAT(tcps, tcp_fusion_urg); 520 BUMP_MIB(&tcps->tcps_mib, tcpOutUrg); 521 522 head = peer_tcp->tcp_rcv_list; 523 while (head != NULL) { 524 /* 525 * Remove existing T_EXDATA_IND, keep the data which follows 526 * it and relink our list. Note that we don't modify the 527 * tcp_rcv_last_tail since it never points to T_EXDATA_IND. 528 */ 529 if (DB_TYPE(head) != M_DATA) { 530 mp1 = head; 531 532 ASSERT(DB_TYPE(mp1->b_cont) == M_DATA); 533 head = mp1->b_cont; 534 mp1->b_cont = NULL; 535 head->b_next = mp1->b_next; 536 mp1->b_next = NULL; 537 if (prev_head != NULL) 538 prev_head->b_next = head; 539 if (peer_tcp->tcp_rcv_list == mp1) 540 peer_tcp->tcp_rcv_list = head; 541 if (peer_tcp->tcp_rcv_last_head == mp1) 542 peer_tcp->tcp_rcv_last_head = head; 543 freeb(mp1); 544 } 545 prev_head = head; 546 head = head->b_next; 547 } 548 } 549 550 /* 551 * Fusion output routine, called by tcp_output() and tcp_wput_proto(). 552 * If we are modifying any member that can be changed outside the squeue, 553 * like tcp_flow_stopped, we need to take tcp_non_sq_lock. 554 */ 555 boolean_t 556 tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size) 557 { 558 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 559 uint_t max_unread; 560 boolean_t flow_stopped, peer_data_queued = B_FALSE; 561 boolean_t urgent = (DB_TYPE(mp) != M_DATA); 562 boolean_t push = B_FALSE; 563 mblk_t *mp1 = mp; 564 ill_t *ilp, *olp; 565 ipif_t *iifp, *oifp; 566 ipha_t *ipha; 567 ip6_t *ip6h; 568 tcph_t *tcph; 569 uint_t ip_hdr_len; 570 uint32_t seq; 571 uint32_t recv_size = send_size; 572 tcp_stack_t *tcps = tcp->tcp_tcps; 573 netstack_t *ns = tcps->tcps_netstack; 574 ip_stack_t *ipst = ns->netstack_ip; 575 576 ASSERT(tcp->tcp_fused); 577 ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp); 578 ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp); 579 ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO || 580 DB_TYPE(mp) == M_PCPROTO); 581 582 /* If this connection requires IP, unfuse and use regular path */ 583 if (tcp_loopback_needs_ip(tcp, ns) || 584 tcp_loopback_needs_ip(peer_tcp, ns) || 585 IPP_ENABLED(IPP_LOCAL_OUT|IPP_LOCAL_IN, ipst) || 586 list_head(&ipst->ips_ipobs_cb_list) != NULL) { 587 TCP_STAT(tcps, tcp_fusion_aborted); 588 tcp->tcp_refuse = B_TRUE; 589 peer_tcp->tcp_refuse = B_TRUE; 590 591 bcopy(peer_tcp->tcp_tcph, &tcp->tcp_saved_tcph, 592 sizeof (tcph_t)); 593 bcopy(tcp->tcp_tcph, &peer_tcp->tcp_saved_tcph, 594 sizeof (tcph_t)); 595 if (tcp->tcp_ipversion == IPV4_VERSION) { 596 bcopy(peer_tcp->tcp_ipha, &tcp->tcp_saved_ipha, 597 sizeof (ipha_t)); 598 bcopy(tcp->tcp_ipha, &peer_tcp->tcp_saved_ipha, 599 sizeof (ipha_t)); 600 } else { 601 bcopy(peer_tcp->tcp_ip6h, &tcp->tcp_saved_ip6h, 602 sizeof (ip6_t)); 603 bcopy(tcp->tcp_ip6h, &peer_tcp->tcp_saved_ip6h, 604 sizeof (ip6_t)); 605 } 606 goto unfuse; 607 } 608 609 if (send_size == 0) { 610 freemsg(mp); 611 return (B_TRUE); 612 } 613 max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater; 614 615 /* 616 * Handle urgent data; we either send up SIGURG to the peer now 617 * or do it later when we drain, in case the peer is detached 618 * or if we're short of memory for M_PCSIG mblk. 619 */ 620 if (urgent) { 621 /* 622 * We stop synchronous streams when we have urgent data 623 * queued to prevent tcp_fuse_rrw() from pulling it. If 624 * for some reasons the urgent data can't be delivered 625 * below, synchronous streams will remain stopped until 626 * someone drains the tcp_rcv_list. 627 */ 628 TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp); 629 tcp_fuse_output_urg(tcp, mp); 630 631 mp1 = mp->b_cont; 632 } 633 634 if (tcp->tcp_ipversion == IPV4_VERSION && 635 (HOOKS4_INTERESTED_LOOPBACK_IN(ipst) || 636 HOOKS4_INTERESTED_LOOPBACK_OUT(ipst)) || 637 tcp->tcp_ipversion == IPV6_VERSION && 638 (HOOKS6_INTERESTED_LOOPBACK_IN(ipst) || 639 HOOKS6_INTERESTED_LOOPBACK_OUT(ipst))) { 640 /* 641 * Build ip and tcp header to satisfy FW_HOOKS. 642 * We only build it when any hook is present. 643 */ 644 if ((mp1 = tcp_xmit_mp(tcp, mp1, tcp->tcp_mss, NULL, NULL, 645 tcp->tcp_snxt, B_TRUE, NULL, B_FALSE)) == NULL) 646 /* If tcp_xmit_mp fails, use regular path */ 647 goto unfuse; 648 649 /* 650 * The ipif and ill can be safely referenced under the 651 * protection of conn_lock - see head of function comment for 652 * conn_get_held_ipif(). It is necessary to check that both 653 * the ipif and ill can be looked up (i.e. not condemned). If 654 * not, bail out and unfuse this connection. 655 */ 656 mutex_enter(&peer_tcp->tcp_connp->conn_lock); 657 if ((peer_tcp->tcp_connp->conn_ire_cache == NULL) || 658 (peer_tcp->tcp_connp->conn_ire_cache->ire_marks & 659 IRE_MARK_CONDEMNED) || 660 ((oifp = peer_tcp->tcp_connp->conn_ire_cache->ire_ipif) 661 == NULL) || 662 (!IPIF_CAN_LOOKUP(oifp)) || 663 ((olp = oifp->ipif_ill) == NULL) || 664 (ill_check_and_refhold(olp) != 0)) { 665 mutex_exit(&peer_tcp->tcp_connp->conn_lock); 666 goto unfuse; 667 } 668 mutex_exit(&peer_tcp->tcp_connp->conn_lock); 669 670 /* PFHooks: LOOPBACK_OUT */ 671 if (tcp->tcp_ipversion == IPV4_VERSION) { 672 ipha = (ipha_t *)mp1->b_rptr; 673 674 DTRACE_PROBE4(ip4__loopback__out__start, 675 ill_t *, NULL, ill_t *, olp, 676 ipha_t *, ipha, mblk_t *, mp1); 677 FW_HOOKS(ipst->ips_ip4_loopback_out_event, 678 ipst->ips_ipv4firewall_loopback_out, 679 NULL, olp, ipha, mp1, mp1, 0, ipst); 680 DTRACE_PROBE1(ip4__loopback__out__end, mblk_t *, mp1); 681 } else { 682 ip6h = (ip6_t *)mp1->b_rptr; 683 684 DTRACE_PROBE4(ip6__loopback__out__start, 685 ill_t *, NULL, ill_t *, olp, 686 ip6_t *, ip6h, mblk_t *, mp1); 687 FW_HOOKS6(ipst->ips_ip6_loopback_out_event, 688 ipst->ips_ipv6firewall_loopback_out, 689 NULL, olp, ip6h, mp1, mp1, 0, ipst); 690 DTRACE_PROBE1(ip6__loopback__out__end, mblk_t *, mp1); 691 } 692 ill_refrele(olp); 693 694 if (mp1 == NULL) 695 goto unfuse; 696 697 /* 698 * The ipif and ill can be safely referenced under the 699 * protection of conn_lock - see head of function comment for 700 * conn_get_held_ipif(). It is necessary to check that both 701 * the ipif and ill can be looked up (i.e. not condemned). If 702 * not, bail out and unfuse this connection. 703 */ 704 mutex_enter(&tcp->tcp_connp->conn_lock); 705 if ((tcp->tcp_connp->conn_ire_cache == NULL) || 706 (tcp->tcp_connp->conn_ire_cache->ire_marks & 707 IRE_MARK_CONDEMNED) || 708 ((iifp = tcp->tcp_connp->conn_ire_cache->ire_ipif) 709 == NULL) || 710 (!IPIF_CAN_LOOKUP(iifp)) || 711 ((ilp = iifp->ipif_ill) == NULL) || 712 (ill_check_and_refhold(ilp) != 0)) { 713 mutex_exit(&tcp->tcp_connp->conn_lock); 714 goto unfuse; 715 } 716 mutex_exit(&tcp->tcp_connp->conn_lock); 717 718 /* PFHooks: LOOPBACK_IN */ 719 if (tcp->tcp_ipversion == IPV4_VERSION) { 720 DTRACE_PROBE4(ip4__loopback__in__start, 721 ill_t *, ilp, ill_t *, NULL, 722 ipha_t *, ipha, mblk_t *, mp1); 723 FW_HOOKS(ipst->ips_ip4_loopback_in_event, 724 ipst->ips_ipv4firewall_loopback_in, 725 ilp, NULL, ipha, mp1, mp1, 0, ipst); 726 DTRACE_PROBE1(ip4__loopback__in__end, mblk_t *, mp1); 727 ill_refrele(ilp); 728 if (mp1 == NULL) 729 goto unfuse; 730 731 ip_hdr_len = IPH_HDR_LENGTH(ipha); 732 } else { 733 DTRACE_PROBE4(ip6__loopback__in__start, 734 ill_t *, ilp, ill_t *, NULL, 735 ip6_t *, ip6h, mblk_t *, mp1); 736 FW_HOOKS6(ipst->ips_ip6_loopback_in_event, 737 ipst->ips_ipv6firewall_loopback_in, 738 ilp, NULL, ip6h, mp1, mp1, 0, ipst); 739 DTRACE_PROBE1(ip6__loopback__in__end, mblk_t *, mp1); 740 ill_refrele(ilp); 741 if (mp1 == NULL) 742 goto unfuse; 743 744 ip_hdr_len = ip_hdr_length_v6(mp1, ip6h); 745 } 746 747 /* Data length might be changed by FW_HOOKS */ 748 tcph = (tcph_t *)&mp1->b_rptr[ip_hdr_len]; 749 seq = ABE32_TO_U32(tcph->th_seq); 750 recv_size += seq - tcp->tcp_snxt; 751 752 /* 753 * The message duplicated by tcp_xmit_mp is freed. 754 * Note: the original message passed in remains unchanged. 755 */ 756 freemsg(mp1); 757 } 758 759 mutex_enter(&peer_tcp->tcp_non_sq_lock); 760 /* 761 * Wake up and signal the peer; it is okay to do this before 762 * enqueueing because we are holding the lock. One of the 763 * advantages of synchronous streams is the ability for us to 764 * find out when the application performs a read on the socket, 765 * by way of tcp_fuse_rrw() entry point being called. Every 766 * data that gets enqueued onto the receiver is treated as if 767 * it has arrived at the receiving endpoint, thus generating 768 * SIGPOLL/SIGIO for asynchronous socket just as in the strrput() 769 * case. However, we only wake up the application when necessary, 770 * i.e. during the first enqueue. When tcp_fuse_rrw() is called 771 * it will send everything upstream. 772 */ 773 if (peer_tcp->tcp_direct_sockfs && !urgent && 774 !TCP_IS_DETACHED(peer_tcp)) { 775 /* Update poll events and send SIGPOLL/SIGIO if necessary */ 776 STR_WAKEUP_SENDSIG(STREAM(peer_tcp->tcp_rq), 777 peer_tcp->tcp_rcv_list); 778 } 779 780 /* 781 * Enqueue data into the peer's receive list; we may or may not 782 * drain the contents depending on the conditions below. 783 */ 784 if (IPCL_IS_NONSTR(peer_tcp->tcp_connp) && 785 peer_tcp->tcp_connp->conn_upper_handle != NULL) { 786 int error; 787 int flags = 0; 788 789 if ((tcp->tcp_valid_bits & TCP_URG_VALID) && 790 (tcp->tcp_urg == tcp->tcp_snxt)) { 791 flags = MSG_OOB; 792 (*peer_tcp->tcp_connp->conn_upcalls->su_signal_oob) 793 (peer_tcp->tcp_connp->conn_upper_handle, 0); 794 tcp->tcp_valid_bits &= ~TCP_URG_VALID; 795 } 796 (*peer_tcp->tcp_connp->conn_upcalls->su_recv)( 797 peer_tcp->tcp_connp->conn_upper_handle, mp, recv_size, 798 flags, &error, &push); 799 } else { 800 if (IPCL_IS_NONSTR(peer_tcp->tcp_connp) && 801 (tcp->tcp_valid_bits & TCP_URG_VALID) && 802 (tcp->tcp_urg == tcp->tcp_snxt)) { 803 /* 804 * Can not deal with urgent pointers 805 * that arrive before the connection has been 806 * accept()ed. 807 */ 808 tcp->tcp_valid_bits &= ~TCP_URG_VALID; 809 freemsg(mp); 810 mutex_exit(&peer_tcp->tcp_non_sq_lock); 811 return (B_TRUE); 812 } 813 814 tcp_rcv_enqueue(peer_tcp, mp, recv_size); 815 } 816 817 /* In case it wrapped around and also to keep it constant */ 818 peer_tcp->tcp_rwnd += recv_size; 819 /* 820 * We increase the peer's unread message count here whilst still 821 * holding it's tcp_non_sq_lock. This ensures that the increment 822 * occurs in the same lock acquisition perimeter as the enqueue. 823 * Depending on lock hierarchy, we can release these locks which 824 * creates a window in which we can race with tcp_fuse_rrw() 825 */ 826 peer_tcp->tcp_fuse_rcv_unread_cnt++; 827 828 /* 829 * Exercise flow-control when needed; we will get back-enabled 830 * in either tcp_accept_finish(), tcp_unfuse(), or tcp_fuse_rrw(). 831 * If tcp_direct_sockfs is on or if the peer endpoint is detached, 832 * we emulate streams flow control by checking the peer's queue 833 * size and high water mark; otherwise we simply use canputnext() 834 * to decide if we need to stop our flow. 835 * 836 * The outstanding unread data block check does not apply for a 837 * detached receiver; this is to avoid unnecessary blocking of the 838 * sender while the accept is currently in progress and is quite 839 * similar to the regular tcp. 840 */ 841 if (TCP_IS_DETACHED(peer_tcp) || max_unread == 0) 842 max_unread = UINT_MAX; 843 844 /* 845 * Since we are accessing our tcp_flow_stopped and might modify it, 846 * we need to take tcp->tcp_non_sq_lock. The lock for the highest 847 * address is held first. Dropping peer_tcp->tcp_non_sq_lock should 848 * not be an issue here since we are within the squeue and the peer 849 * won't disappear. 850 */ 851 if (tcp > peer_tcp) { 852 mutex_exit(&peer_tcp->tcp_non_sq_lock); 853 mutex_enter(&tcp->tcp_non_sq_lock); 854 mutex_enter(&peer_tcp->tcp_non_sq_lock); 855 } else { 856 mutex_enter(&tcp->tcp_non_sq_lock); 857 } 858 flow_stopped = tcp->tcp_flow_stopped; 859 if (((peer_tcp->tcp_direct_sockfs || TCP_IS_DETACHED(peer_tcp)) && 860 (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater || 861 peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) || 862 (!peer_tcp->tcp_direct_sockfs && !TCP_IS_DETACHED(peer_tcp) && 863 !IPCL_IS_NONSTR(peer_tcp->tcp_connp) && 864 !canputnext(peer_tcp->tcp_rq))) { 865 peer_data_queued = B_TRUE; 866 } 867 868 if (!flow_stopped && (peer_data_queued || 869 (TCP_UNSENT_BYTES(tcp) >= tcp->tcp_xmit_hiwater))) { 870 tcp_setqfull(tcp); 871 flow_stopped = B_TRUE; 872 TCP_STAT(tcps, tcp_fusion_flowctl); 873 DTRACE_PROBE4(tcp__fuse__output__flowctl, tcp_t *, tcp, 874 uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt, 875 uint_t, peer_tcp->tcp_fuse_rcv_unread_cnt); 876 } else if (flow_stopped && !peer_data_queued && 877 (TCP_UNSENT_BYTES(tcp) <= tcp->tcp_xmit_lowater)) { 878 tcp_clrqfull(tcp); 879 TCP_STAT(tcps, tcp_fusion_backenabled); 880 flow_stopped = B_FALSE; 881 } 882 mutex_exit(&tcp->tcp_non_sq_lock); 883 884 /* 885 * If we are in synchronous streams mode and the peer read queue is 886 * not full then schedule a push timer if one is not scheduled 887 * already. This is needed for applications which use MSG_PEEK to 888 * determine the number of bytes available before issuing a 'real' 889 * read. It also makes flow control more deterministic, particularly 890 * for smaller message sizes. 891 */ 892 if (!urgent && peer_tcp->tcp_direct_sockfs && 893 peer_tcp->tcp_push_tid == 0 && !TCP_IS_DETACHED(peer_tcp) && 894 canputnext(peer_tcp->tcp_rq)) { 895 peer_tcp->tcp_push_tid = TCP_TIMER(peer_tcp, tcp_push_timer, 896 MSEC_TO_TICK(tcps->tcps_push_timer_interval)); 897 } 898 mutex_exit(&peer_tcp->tcp_non_sq_lock); 899 ipst->ips_loopback_packets++; 900 tcp->tcp_last_sent_len = send_size; 901 902 /* Need to adjust the following SNMP MIB-related variables */ 903 tcp->tcp_snxt += send_size; 904 tcp->tcp_suna = tcp->tcp_snxt; 905 peer_tcp->tcp_rnxt += recv_size; 906 peer_tcp->tcp_rack = peer_tcp->tcp_rnxt; 907 908 BUMP_MIB(&tcps->tcps_mib, tcpOutDataSegs); 909 UPDATE_MIB(&tcps->tcps_mib, tcpOutDataBytes, send_size); 910 911 BUMP_MIB(&tcps->tcps_mib, tcpInSegs); 912 BUMP_MIB(&tcps->tcps_mib, tcpInDataInorderSegs); 913 UPDATE_MIB(&tcps->tcps_mib, tcpInDataInorderBytes, send_size); 914 915 BUMP_LOCAL(tcp->tcp_obsegs); 916 BUMP_LOCAL(peer_tcp->tcp_ibsegs); 917 918 DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size); 919 920 if (!TCP_IS_DETACHED(peer_tcp)) { 921 /* 922 * Drain the peer's receive queue it has urgent data or if 923 * we're not flow-controlled. There is no need for draining 924 * normal data when tcp_direct_sockfs is on because the peer 925 * will pull the data via tcp_fuse_rrw(). 926 */ 927 if (urgent || (!flow_stopped && !peer_tcp->tcp_direct_sockfs)) { 928 ASSERT(IPCL_IS_NONSTR(peer_tcp->tcp_connp) || 929 peer_tcp->tcp_rcv_list != NULL); 930 /* 931 * For TLI-based streams, a thread in tcp_accept_swap() 932 * can race with us. That thread will ensure that the 933 * correct peer_tcp->tcp_rq is globally visible before 934 * peer_tcp->tcp_detached is visible as clear, but we 935 * must also ensure that the load of tcp_rq cannot be 936 * reordered to be before the tcp_detached check. 937 */ 938 membar_consumer(); 939 (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp, 940 NULL); 941 /* 942 * If synchronous streams was stopped above due 943 * to the presence of urgent data, re-enable it. 944 */ 945 if (urgent) 946 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp); 947 } 948 } 949 return (B_TRUE); 950 unfuse: 951 tcp_unfuse(tcp); 952 return (B_FALSE); 953 } 954 955 /* 956 * This routine gets called to deliver data upstream on a fused or 957 * previously fused tcp loopback endpoint; the latter happens only 958 * when there is a pending SIGURG signal plus urgent data that can't 959 * be sent upstream in the past. 960 */ 961 boolean_t 962 tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp) 963 { 964 mblk_t *mp; 965 conn_t *connp = tcp->tcp_connp; 966 967 #ifdef DEBUG 968 uint_t cnt = 0; 969 #endif 970 tcp_stack_t *tcps = tcp->tcp_tcps; 971 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 972 boolean_t sd_rd_eof = B_FALSE; 973 974 ASSERT(tcp->tcp_loopback); 975 ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg); 976 ASSERT(!tcp->tcp_fused || tcp->tcp_loopback_peer != NULL); 977 ASSERT(IPCL_IS_NONSTR(connp) || sigurg_mpp != NULL || tcp->tcp_fused); 978 979 /* No need for the push timer now, in case it was scheduled */ 980 if (tcp->tcp_push_tid != 0) { 981 (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid); 982 tcp->tcp_push_tid = 0; 983 } 984 /* 985 * If there's urgent data sitting in receive list and we didn't 986 * get a chance to send up a SIGURG signal, make sure we send 987 * it first before draining in order to ensure that SIOCATMARK 988 * works properly. 989 */ 990 if (tcp->tcp_fused_sigurg) { 991 tcp->tcp_fused_sigurg = B_FALSE; 992 if (IPCL_IS_NONSTR(connp)) { 993 (*connp->conn_upcalls->su_signal_oob) 994 (connp->conn_upper_handle, 0); 995 } else { 996 /* 997 * sigurg_mpp is normally NULL, i.e. when we're still 998 * fused and didn't get here because of tcp_unfuse(). 999 * In this case try hard to allocate the M_PCSIG mblk. 1000 */ 1001 if (sigurg_mpp == NULL && 1002 (mp = allocb(1, BPRI_HI)) == NULL && 1003 (mp = allocb_tryhard(1)) == NULL) { 1004 /* Alloc failed; try again next time */ 1005 tcp->tcp_push_tid = TCP_TIMER(tcp, 1006 tcp_push_timer, 1007 MSEC_TO_TICK( 1008 tcps->tcps_push_timer_interval)); 1009 return (B_TRUE); 1010 } else if (sigurg_mpp != NULL) { 1011 /* 1012 * Use the supplied M_PCSIG mblk; it means we're 1013 * either unfused or in the process of unfusing, 1014 * and the drain must happen now. 1015 */ 1016 mp = *sigurg_mpp; 1017 *sigurg_mpp = NULL; 1018 } 1019 ASSERT(mp != NULL); 1020 1021 /* Send up the signal */ 1022 DB_TYPE(mp) = M_PCSIG; 1023 *mp->b_wptr++ = (uchar_t)SIGURG; 1024 putnext(q, mp); 1025 } 1026 /* 1027 * Let the regular tcp_rcv_drain() path handle 1028 * draining the data if we're no longer fused. 1029 */ 1030 if (!tcp->tcp_fused) 1031 return (B_FALSE); 1032 } 1033 1034 /* 1035 * In the synchronous streams case, we generate SIGPOLL/SIGIO for 1036 * each M_DATA that gets enqueued onto the receiver. At this point 1037 * we are about to drain any queued data via putnext(). In order 1038 * to avoid extraneous signal generation from strrput(), we set 1039 * STRGETINPROG flag at the stream head prior to the draining and 1040 * restore it afterwards. This masks out signal generation only 1041 * for M_DATA messages and does not affect urgent data. We only do 1042 * this if the STREOF flag is not set which can happen if the 1043 * application shuts down the read side of a stream. In this case 1044 * we simply free these messages to approximate the flushq behavior 1045 * which normally occurs when STREOF is on the stream head read queue. 1046 */ 1047 if (tcp->tcp_direct_sockfs) 1048 sd_rd_eof = strrput_sig(q, B_FALSE); 1049 1050 /* Drain the data */ 1051 while ((mp = tcp->tcp_rcv_list) != NULL) { 1052 tcp->tcp_rcv_list = mp->b_next; 1053 mp->b_next = NULL; 1054 #ifdef DEBUG 1055 cnt += msgdsize(mp); 1056 #endif 1057 ASSERT(!IPCL_IS_NONSTR(connp)); 1058 if (sd_rd_eof) { 1059 freemsg(mp); 1060 } else { 1061 putnext(q, mp); 1062 TCP_STAT(tcps, tcp_fusion_putnext); 1063 } 1064 } 1065 1066 if (tcp->tcp_direct_sockfs && !sd_rd_eof) 1067 (void) strrput_sig(q, B_TRUE); 1068 1069 #ifdef DEBUG 1070 ASSERT(cnt == tcp->tcp_rcv_cnt); 1071 #endif 1072 tcp->tcp_rcv_last_head = NULL; 1073 tcp->tcp_rcv_last_tail = NULL; 1074 tcp->tcp_rcv_cnt = 0; 1075 tcp->tcp_fuse_rcv_unread_cnt = 0; 1076 tcp->tcp_rwnd = tcp->tcp_recv_hiwater; 1077 1078 if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <= 1079 peer_tcp->tcp_xmit_lowater)) { 1080 tcp_clrqfull(peer_tcp); 1081 TCP_STAT(tcps, tcp_fusion_backenabled); 1082 } 1083 1084 return (B_TRUE); 1085 } 1086 1087 /* 1088 * Synchronous stream entry point for sockfs to retrieve 1089 * data directly from tcp_rcv_list. 1090 * tcp_fuse_rrw() might end up modifying the peer's tcp_flow_stopped, 1091 * for which it must take the tcp_non_sq_lock of the peer as well 1092 * making any change. The order of taking the locks is based on 1093 * the TCP pointer itself. Before we get the peer we need to take 1094 * our tcp_non_sq_lock so that the peer doesn't disappear. However, 1095 * we cannot drop the lock if we have to grab the peer's lock (because 1096 * of ordering), since the peer might disappear in the interim. So, 1097 * we take our tcp_non_sq_lock, get the peer, increment the ref on the 1098 * peer's conn, drop all the locks and then take the tcp_non_sq_lock in the 1099 * desired order. Incrementing the conn ref on the peer means that the 1100 * peer won't disappear when we drop our tcp_non_sq_lock. 1101 */ 1102 int 1103 tcp_fuse_rrw(queue_t *q, struiod_t *dp) 1104 { 1105 tcp_t *tcp = Q_TO_CONN(q)->conn_tcp; 1106 mblk_t *mp; 1107 tcp_t *peer_tcp; 1108 tcp_stack_t *tcps = tcp->tcp_tcps; 1109 1110 mutex_enter(&tcp->tcp_non_sq_lock); 1111 1112 /* 1113 * If tcp_fuse_syncstr_plugged is set, then another thread is moving 1114 * the underlying data to the stream head. We need to wait until it's 1115 * done, then return EBUSY so that strget() will dequeue data from the 1116 * stream head to ensure data is drained in-order. 1117 */ 1118 plugged: 1119 if (tcp->tcp_fuse_syncstr_plugged) { 1120 do { 1121 cv_wait(&tcp->tcp_fuse_plugcv, &tcp->tcp_non_sq_lock); 1122 } while (tcp->tcp_fuse_syncstr_plugged); 1123 1124 mutex_exit(&tcp->tcp_non_sq_lock); 1125 TCP_STAT(tcps, tcp_fusion_rrw_plugged); 1126 TCP_STAT(tcps, tcp_fusion_rrw_busy); 1127 return (EBUSY); 1128 } 1129 1130 peer_tcp = tcp->tcp_loopback_peer; 1131 1132 /* 1133 * If someone had turned off tcp_direct_sockfs or if synchronous 1134 * streams is stopped, we return EBUSY. This causes strget() to 1135 * dequeue data from the stream head instead. 1136 */ 1137 if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) { 1138 mutex_exit(&tcp->tcp_non_sq_lock); 1139 TCP_STAT(tcps, tcp_fusion_rrw_busy); 1140 return (EBUSY); 1141 } 1142 1143 /* 1144 * Grab lock in order. The highest addressed tcp is locked first. 1145 * We don't do this within the tcp_rcv_list check since if we 1146 * have to drop the lock, for ordering, then the tcp_rcv_list 1147 * could change. 1148 */ 1149 if (peer_tcp > tcp) { 1150 CONN_INC_REF(peer_tcp->tcp_connp); 1151 mutex_exit(&tcp->tcp_non_sq_lock); 1152 mutex_enter(&peer_tcp->tcp_non_sq_lock); 1153 mutex_enter(&tcp->tcp_non_sq_lock); 1154 /* 1155 * This might have changed in the interim 1156 * Once read-side tcp_non_sq_lock is dropped above 1157 * anything can happen, we need to check all 1158 * known conditions again once we reaquire 1159 * read-side tcp_non_sq_lock. 1160 */ 1161 if (tcp->tcp_fuse_syncstr_plugged) { 1162 mutex_exit(&peer_tcp->tcp_non_sq_lock); 1163 CONN_DEC_REF(peer_tcp->tcp_connp); 1164 goto plugged; 1165 } 1166 if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) { 1167 mutex_exit(&tcp->tcp_non_sq_lock); 1168 mutex_exit(&peer_tcp->tcp_non_sq_lock); 1169 CONN_DEC_REF(peer_tcp->tcp_connp); 1170 TCP_STAT(tcps, tcp_fusion_rrw_busy); 1171 return (EBUSY); 1172 } 1173 CONN_DEC_REF(peer_tcp->tcp_connp); 1174 } else { 1175 mutex_enter(&peer_tcp->tcp_non_sq_lock); 1176 } 1177 1178 if ((mp = tcp->tcp_rcv_list) != NULL) { 1179 1180 DTRACE_PROBE3(tcp__fuse__rrw, tcp_t *, tcp, 1181 uint32_t, tcp->tcp_rcv_cnt, ssize_t, dp->d_uio.uio_resid); 1182 1183 tcp->tcp_rcv_list = NULL; 1184 TCP_STAT(tcps, tcp_fusion_rrw_msgcnt); 1185 1186 /* 1187 * At this point nothing should be left in tcp_rcv_list. 1188 * The only possible case where we would have a chain of 1189 * b_next-linked messages is urgent data, but we wouldn't 1190 * be here if that's true since urgent data is delivered 1191 * via putnext() and synchronous streams is stopped until 1192 * tcp_fuse_rcv_drain() is finished. 1193 */ 1194 ASSERT(DB_TYPE(mp) == M_DATA && mp->b_next == NULL); 1195 1196 tcp->tcp_rcv_last_head = NULL; 1197 tcp->tcp_rcv_last_tail = NULL; 1198 tcp->tcp_rcv_cnt = 0; 1199 tcp->tcp_fuse_rcv_unread_cnt = 0; 1200 1201 if (peer_tcp->tcp_flow_stopped && 1202 (TCP_UNSENT_BYTES(peer_tcp) <= 1203 peer_tcp->tcp_xmit_lowater)) { 1204 tcp_clrqfull(peer_tcp); 1205 TCP_STAT(tcps, tcp_fusion_backenabled); 1206 } 1207 } 1208 mutex_exit(&peer_tcp->tcp_non_sq_lock); 1209 /* 1210 * Either we just dequeued everything or we get here from sockfs 1211 * and have nothing to return; in this case clear RSLEEP. 1212 */ 1213 ASSERT(tcp->tcp_rcv_last_head == NULL); 1214 ASSERT(tcp->tcp_rcv_last_tail == NULL); 1215 ASSERT(tcp->tcp_rcv_cnt == 0); 1216 ASSERT(tcp->tcp_fuse_rcv_unread_cnt == 0); 1217 STR_WAKEUP_CLEAR(STREAM(q)); 1218 1219 mutex_exit(&tcp->tcp_non_sq_lock); 1220 dp->d_mp = mp; 1221 return (0); 1222 } 1223 1224 /* 1225 * Synchronous stream entry point used by certain ioctls to retrieve 1226 * information about or peek into the tcp_rcv_list. 1227 */ 1228 int 1229 tcp_fuse_rinfop(queue_t *q, infod_t *dp) 1230 { 1231 tcp_t *tcp = Q_TO_CONN(q)->conn_tcp; 1232 mblk_t *mp; 1233 uint_t cmd = dp->d_cmd; 1234 int res = 0; 1235 int error = 0; 1236 struct stdata *stp = STREAM(q); 1237 1238 mutex_enter(&tcp->tcp_non_sq_lock); 1239 /* If shutdown on read has happened, return nothing */ 1240 mutex_enter(&stp->sd_lock); 1241 if (stp->sd_flag & STREOF) { 1242 mutex_exit(&stp->sd_lock); 1243 goto done; 1244 } 1245 mutex_exit(&stp->sd_lock); 1246 1247 /* 1248 * It is OK not to return an answer if tcp_rcv_list is 1249 * currently not accessible. 1250 */ 1251 if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped || 1252 tcp->tcp_fuse_syncstr_plugged || (mp = tcp->tcp_rcv_list) == NULL) 1253 goto done; 1254 1255 if (cmd & INFOD_COUNT) { 1256 /* 1257 * We have at least one message and 1258 * could return only one at a time. 1259 */ 1260 dp->d_count++; 1261 res |= INFOD_COUNT; 1262 } 1263 if (cmd & INFOD_BYTES) { 1264 /* 1265 * Return size of all data messages. 1266 */ 1267 dp->d_bytes += tcp->tcp_rcv_cnt; 1268 res |= INFOD_BYTES; 1269 } 1270 if (cmd & INFOD_FIRSTBYTES) { 1271 /* 1272 * Return size of first data message. 1273 */ 1274 dp->d_bytes = msgdsize(mp); 1275 res |= INFOD_FIRSTBYTES; 1276 dp->d_cmd &= ~INFOD_FIRSTBYTES; 1277 } 1278 if (cmd & INFOD_COPYOUT) { 1279 mblk_t *mp1; 1280 int n; 1281 1282 if (DB_TYPE(mp) == M_DATA) { 1283 mp1 = mp; 1284 } else { 1285 mp1 = mp->b_cont; 1286 ASSERT(mp1 != NULL); 1287 } 1288 1289 /* 1290 * Return data contents of first message. 1291 */ 1292 ASSERT(DB_TYPE(mp1) == M_DATA); 1293 while (mp1 != NULL && dp->d_uiop->uio_resid > 0) { 1294 n = MIN(dp->d_uiop->uio_resid, MBLKL(mp1)); 1295 if (n != 0 && (error = uiomove((char *)mp1->b_rptr, n, 1296 UIO_READ, dp->d_uiop)) != 0) { 1297 goto done; 1298 } 1299 mp1 = mp1->b_cont; 1300 } 1301 res |= INFOD_COPYOUT; 1302 dp->d_cmd &= ~INFOD_COPYOUT; 1303 } 1304 done: 1305 mutex_exit(&tcp->tcp_non_sq_lock); 1306 1307 dp->d_res |= res; 1308 1309 return (error); 1310 } 1311 1312 /* 1313 * Enable synchronous streams on a fused tcp loopback endpoint. 1314 */ 1315 static void 1316 tcp_fuse_syncstr_enable(tcp_t *tcp) 1317 { 1318 queue_t *rq = tcp->tcp_rq; 1319 struct stdata *stp = STREAM(rq); 1320 1321 /* We can only enable synchronous streams for sockfs mode */ 1322 tcp->tcp_direct_sockfs = tcp->tcp_issocket && do_tcp_direct_sockfs; 1323 1324 if (!tcp->tcp_direct_sockfs) 1325 return; 1326 1327 mutex_enter(&stp->sd_lock); 1328 mutex_enter(QLOCK(rq)); 1329 1330 /* 1331 * We replace our q_qinfo with one that has the qi_rwp entry point. 1332 * Clear SR_SIGALLDATA because we generate the equivalent signal(s) 1333 * for every enqueued data in tcp_fuse_output(). 1334 */ 1335 rq->q_qinfo = &tcp_loopback_rinit; 1336 rq->q_struiot = tcp_loopback_rinit.qi_struiot; 1337 stp->sd_struiordq = rq; 1338 stp->sd_rput_opt &= ~SR_SIGALLDATA; 1339 1340 mutex_exit(QLOCK(rq)); 1341 mutex_exit(&stp->sd_lock); 1342 } 1343 1344 /* 1345 * Disable synchronous streams on a fused tcp loopback endpoint. 1346 */ 1347 static void 1348 tcp_fuse_syncstr_disable(tcp_t *tcp) 1349 { 1350 queue_t *rq = tcp->tcp_rq; 1351 struct stdata *stp = STREAM(rq); 1352 1353 if (!tcp->tcp_direct_sockfs) 1354 return; 1355 1356 mutex_enter(&stp->sd_lock); 1357 mutex_enter(QLOCK(rq)); 1358 1359 /* 1360 * Reset q_qinfo to point to the default tcp entry points. 1361 * Also restore SR_SIGALLDATA so that strrput() can generate 1362 * the signals again for future M_DATA messages. 1363 */ 1364 rq->q_qinfo = &tcp_rinitv4; /* No open - same as rinitv6 */ 1365 rq->q_struiot = tcp_rinitv4.qi_struiot; 1366 stp->sd_struiordq = NULL; 1367 stp->sd_rput_opt |= SR_SIGALLDATA; 1368 tcp->tcp_direct_sockfs = B_FALSE; 1369 1370 mutex_exit(QLOCK(rq)); 1371 mutex_exit(&stp->sd_lock); 1372 } 1373 1374 /* 1375 * Enable synchronous streams on a pair of fused tcp endpoints. 1376 */ 1377 void 1378 tcp_fuse_syncstr_enable_pair(tcp_t *tcp) 1379 { 1380 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 1381 1382 ASSERT(tcp->tcp_fused); 1383 ASSERT(peer_tcp != NULL); 1384 1385 tcp_fuse_syncstr_enable(tcp); 1386 tcp_fuse_syncstr_enable(peer_tcp); 1387 } 1388 1389 /* 1390 * Used to enable/disable signal generation at the stream head. We already 1391 * generated the signal(s) for these messages when they were enqueued on the 1392 * receiver. We also check if STREOF is set here. If it is, we return false 1393 * and let the caller decide what to do. 1394 */ 1395 static boolean_t 1396 strrput_sig(queue_t *q, boolean_t on) 1397 { 1398 struct stdata *stp = STREAM(q); 1399 1400 mutex_enter(&stp->sd_lock); 1401 if (stp->sd_flag == STREOF) { 1402 mutex_exit(&stp->sd_lock); 1403 return (B_TRUE); 1404 } 1405 if (on) 1406 stp->sd_flag &= ~STRGETINPROG; 1407 else 1408 stp->sd_flag |= STRGETINPROG; 1409 mutex_exit(&stp->sd_lock); 1410 1411 return (B_FALSE); 1412 } 1413 1414 /* 1415 * Disable synchronous streams on a pair of fused tcp endpoints and drain 1416 * any queued data; called either during unfuse or upon transitioning from 1417 * a socket to a stream endpoint due to _SIOCSOCKFALLBACK. 1418 */ 1419 void 1420 tcp_fuse_disable_pair(tcp_t *tcp, boolean_t unfusing) 1421 { 1422 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 1423 tcp_stack_t *tcps = tcp->tcp_tcps; 1424 1425 ASSERT(tcp->tcp_fused); 1426 ASSERT(peer_tcp != NULL); 1427 1428 /* 1429 * Force any tcp_fuse_rrw() calls to block until we've moved the data 1430 * onto the stream head. 1431 */ 1432 TCP_FUSE_SYNCSTR_PLUG_DRAIN(tcp); 1433 TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp); 1434 1435 /* 1436 * Cancel any pending push timers. 1437 */ 1438 if (tcp->tcp_push_tid != 0) { 1439 (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid); 1440 tcp->tcp_push_tid = 0; 1441 } 1442 if (peer_tcp->tcp_push_tid != 0) { 1443 (void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid); 1444 peer_tcp->tcp_push_tid = 0; 1445 } 1446 1447 /* 1448 * Drain any pending data; the detached check is needed because 1449 * we may be called as a result of a tcp_unfuse() triggered by 1450 * tcp_fuse_output(). Note that in case of a detached tcp, the 1451 * draining will happen later after the tcp is unfused. For non- 1452 * urgent data, this can be handled by the regular tcp_rcv_drain(). 1453 * If we have urgent data sitting in the receive list, we will 1454 * need to send up a SIGURG signal first before draining the data. 1455 * All of these will be handled by the code in tcp_fuse_rcv_drain() 1456 * when called from tcp_rcv_drain(). 1457 */ 1458 if (!TCP_IS_DETACHED(tcp)) { 1459 (void) tcp_fuse_rcv_drain(tcp->tcp_rq, tcp, 1460 (unfusing ? &tcp->tcp_fused_sigurg_mp : NULL)); 1461 } 1462 if (!TCP_IS_DETACHED(peer_tcp)) { 1463 (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp, 1464 (unfusing ? &peer_tcp->tcp_fused_sigurg_mp : NULL)); 1465 } 1466 1467 /* 1468 * Make all current and future tcp_fuse_rrw() calls fail with EBUSY. 1469 * To ensure threads don't sneak past the checks in tcp_fuse_rrw(), 1470 * a given stream must be stopped prior to being unplugged (but the 1471 * ordering of operations between the streams is unimportant). 1472 */ 1473 TCP_FUSE_SYNCSTR_STOP(tcp); 1474 TCP_FUSE_SYNCSTR_STOP(peer_tcp); 1475 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(tcp); 1476 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp); 1477 1478 /* Lift up any flow-control conditions */ 1479 if (tcp->tcp_flow_stopped) { 1480 tcp_clrqfull(tcp); 1481 TCP_STAT(tcps, tcp_fusion_backenabled); 1482 } 1483 if (peer_tcp->tcp_flow_stopped) { 1484 tcp_clrqfull(peer_tcp); 1485 TCP_STAT(tcps, tcp_fusion_backenabled); 1486 } 1487 1488 /* Disable synchronous streams */ 1489 if (!IPCL_IS_NONSTR(tcp->tcp_connp)) 1490 tcp_fuse_syncstr_disable(tcp); 1491 if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp)) 1492 tcp_fuse_syncstr_disable(peer_tcp); 1493 } 1494 1495 /* 1496 * Calculate the size of receive buffer for a fused tcp endpoint. 1497 */ 1498 size_t 1499 tcp_fuse_set_rcv_hiwat(tcp_t *tcp, size_t rwnd) 1500 { 1501 tcp_stack_t *tcps = tcp->tcp_tcps; 1502 1503 ASSERT(tcp->tcp_fused); 1504 1505 /* Ensure that value is within the maximum upper bound */ 1506 if (rwnd > tcps->tcps_max_buf) 1507 rwnd = tcps->tcps_max_buf; 1508 1509 /* Obey the absolute minimum tcp receive high water mark */ 1510 if (rwnd < tcps->tcps_sth_rcv_hiwat) 1511 rwnd = tcps->tcps_sth_rcv_hiwat; 1512 1513 /* 1514 * Round up to system page size in case SO_RCVBUF is modified 1515 * after SO_SNDBUF; the latter is also similarly rounded up. 1516 */ 1517 rwnd = P2ROUNDUP_TYPED(rwnd, PAGESIZE, size_t); 1518 tcp->tcp_fuse_rcv_hiwater = rwnd; 1519 return (rwnd); 1520 } 1521 1522 /* 1523 * Calculate the maximum outstanding unread data block for a fused tcp endpoint. 1524 */ 1525 int 1526 tcp_fuse_maxpsz_set(tcp_t *tcp) 1527 { 1528 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 1529 uint_t sndbuf = tcp->tcp_xmit_hiwater; 1530 uint_t maxpsz = sndbuf; 1531 1532 ASSERT(tcp->tcp_fused); 1533 ASSERT(peer_tcp != NULL); 1534 ASSERT(peer_tcp->tcp_fuse_rcv_hiwater != 0); 1535 /* 1536 * In the fused loopback case, we want the stream head to split 1537 * up larger writes into smaller chunks for a more accurate flow- 1538 * control accounting. Our maxpsz is half of the sender's send 1539 * buffer or the receiver's receive buffer, whichever is smaller. 1540 * We round up the buffer to system page size due to the lack of 1541 * TCP MSS concept in Fusion. 1542 */ 1543 if (maxpsz > peer_tcp->tcp_fuse_rcv_hiwater) 1544 maxpsz = peer_tcp->tcp_fuse_rcv_hiwater; 1545 maxpsz = P2ROUNDUP_TYPED(maxpsz, PAGESIZE, uint_t) >> 1; 1546 1547 /* 1548 * Calculate the peer's limit for the number of outstanding unread 1549 * data block. This is the amount of data blocks that are allowed 1550 * to reside in the receiver's queue before the sender gets flow 1551 * controlled. It is used only in the synchronous streams mode as 1552 * a way to throttle the sender when it performs consecutive writes 1553 * faster than can be read. The value is derived from SO_SNDBUF in 1554 * order to give the sender some control; we divide it with a large 1555 * value (16KB) to produce a fairly low initial limit. 1556 */ 1557 if (tcp_fusion_rcv_unread_min == 0) { 1558 /* A value of 0 means that we disable the check */ 1559 peer_tcp->tcp_fuse_rcv_unread_hiwater = 0; 1560 } else { 1561 peer_tcp->tcp_fuse_rcv_unread_hiwater = 1562 MAX(sndbuf >> 14, tcp_fusion_rcv_unread_min); 1563 } 1564 return (maxpsz); 1565 } 1566