1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include <sys/types.h> 29 #include <sys/stream.h> 30 #include <sys/strsun.h> 31 #include <sys/strsubr.h> 32 #include <sys/debug.h> 33 #include <sys/sdt.h> 34 #include <sys/cmn_err.h> 35 #include <sys/tihdr.h> 36 37 #include <inet/common.h> 38 #include <inet/ip.h> 39 #include <inet/ip_impl.h> 40 #include <inet/tcp.h> 41 #include <inet/tcp_impl.h> 42 #include <inet/ipsec_impl.h> 43 #include <inet/ipclassifier.h> 44 #include <inet/ipp_common.h> 45 46 /* 47 * This file implements TCP fusion - a protocol-less data path for TCP 48 * loopback connections. The fusion of two local TCP endpoints occurs 49 * at connection establishment time. Various conditions (see details 50 * in tcp_fuse()) need to be met for fusion to be successful. If it 51 * fails, we fall back to the regular TCP data path; if it succeeds, 52 * both endpoints proceed to use tcp_fuse_output() as the transmit path. 53 * tcp_fuse_output() enqueues application data directly onto the peer's 54 * receive queue; no protocol processing is involved. After enqueueing 55 * the data, the sender can either push (putnext) data up the receiver's 56 * read queue; or the sender can simply return and let the receiver 57 * retrieve the enqueued data via the synchronous streams entry point 58 * tcp_fuse_rrw(). The latter path is taken if synchronous streams is 59 * enabled (the default). It is disabled if sockfs no longer resides 60 * directly on top of tcp module due to a module insertion or removal. 61 * It also needs to be temporarily disabled when sending urgent data 62 * because the tcp_fuse_rrw() path bypasses the M_PROTO processing done 63 * by strsock_proto() hook. 64 * 65 * Sychronization is handled by squeue and the mutex tcp_non_sq_lock. 66 * One of the requirements for fusion to succeed is that both endpoints 67 * need to be using the same squeue. This ensures that neither side 68 * can disappear while the other side is still sending data. By itself, 69 * squeue is not sufficient for guaranteeing safety when synchronous 70 * streams is enabled. The reason is that tcp_fuse_rrw() doesn't enter 71 * the squeue and its access to tcp_rcv_list and other fusion-related 72 * fields needs to be sychronized with the sender. tcp_non_sq_lock is 73 * used for this purpose. When there is urgent data, the sender needs 74 * to push the data up the receiver's streams read queue. In order to 75 * avoid holding the tcp_non_sq_lock across putnext(), the sender sets 76 * the peer tcp's tcp_fuse_syncstr_plugged bit and releases tcp_non_sq_lock 77 * (see macro TCP_FUSE_SYNCSTR_PLUG_DRAIN()). If tcp_fuse_rrw() enters 78 * after this point, it will see that synchronous streams is plugged and 79 * will wait on tcp_fuse_plugcv. After the sender has finished pushing up 80 * all urgent data, it will clear the tcp_fuse_syncstr_plugged bit using 81 * TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(). This will cause any threads waiting 82 * on tcp_fuse_plugcv to return EBUSY, and in turn cause strget() to call 83 * getq_noenab() to dequeue data from the stream head instead. Once the 84 * data on the stream head has been consumed, tcp_fuse_rrw() may again 85 * be used to process tcp_rcv_list. However, if TCP_FUSE_SYNCSTR_STOP() 86 * has been called, all future calls to tcp_fuse_rrw() will return EBUSY, 87 * effectively disabling synchronous streams. 88 * 89 * The following note applies only to the synchronous streams mode. 90 * 91 * Flow control is done by checking the size of receive buffer and 92 * the number of data blocks, both set to different limits. This is 93 * different than regular streams flow control where cumulative size 94 * check dominates block count check -- streams queue high water mark 95 * typically represents bytes. Each enqueue triggers notifications 96 * to the receiving process; a build up of data blocks indicates a 97 * slow receiver and the sender should be blocked or informed at the 98 * earliest moment instead of further wasting system resources. In 99 * effect, this is equivalent to limiting the number of outstanding 100 * segments in flight. 101 */ 102 103 /* 104 * Setting this to false means we disable fusion altogether and 105 * loopback connections would go through the protocol paths. 106 */ 107 boolean_t do_tcp_fusion = B_TRUE; 108 109 /* 110 * Enabling this flag allows sockfs to retrieve data directly 111 * from a fused tcp endpoint using synchronous streams interface. 112 */ 113 boolean_t do_tcp_direct_sockfs = B_TRUE; 114 115 /* 116 * This is the minimum amount of outstanding writes allowed on 117 * a synchronous streams-enabled receiving endpoint before the 118 * sender gets flow-controlled. Setting this value to 0 means 119 * that the data block limit is equivalent to the byte count 120 * limit, which essentially disables the check. 121 */ 122 #define TCP_FUSION_RCV_UNREAD_MIN 8 123 uint_t tcp_fusion_rcv_unread_min = TCP_FUSION_RCV_UNREAD_MIN; 124 125 static void tcp_fuse_syncstr_enable(tcp_t *); 126 static void tcp_fuse_syncstr_disable(tcp_t *); 127 static void strrput_sig(queue_t *, boolean_t); 128 129 /* 130 * Return true if this connection needs some IP functionality 131 */ 132 static boolean_t 133 tcp_loopback_needs_ip(tcp_t *tcp, netstack_t *ns) 134 { 135 ipsec_stack_t *ipss = ns->netstack_ipsec; 136 137 if (tcp->tcp_ipversion == IPV4_VERSION) { 138 if (tcp->tcp_ip_hdr_len != IP_SIMPLE_HDR_LENGTH) 139 return (B_TRUE); 140 if (CONN_OUTBOUND_POLICY_PRESENT(tcp->tcp_connp, ipss)) 141 return (B_TRUE); 142 if (CONN_INBOUND_POLICY_PRESENT(tcp->tcp_connp, ipss)) 143 return (B_TRUE); 144 } else { 145 if (tcp->tcp_ip_hdr_len != IPV6_HDR_LEN) 146 return (B_TRUE); 147 if (CONN_OUTBOUND_POLICY_PRESENT_V6(tcp->tcp_connp, ipss)) 148 return (B_TRUE); 149 if (CONN_INBOUND_POLICY_PRESENT_V6(tcp->tcp_connp, ipss)) 150 return (B_TRUE); 151 } 152 if (!CONN_IS_LSO_MD_FASTPATH(tcp->tcp_connp)) 153 return (B_TRUE); 154 return (B_FALSE); 155 } 156 157 158 /* 159 * This routine gets called by the eager tcp upon changing state from 160 * SYN_RCVD to ESTABLISHED. It fuses a direct path between itself 161 * and the active connect tcp such that the regular tcp processings 162 * may be bypassed under allowable circumstances. Because the fusion 163 * requires both endpoints to be in the same squeue, it does not work 164 * for simultaneous active connects because there is no easy way to 165 * switch from one squeue to another once the connection is created. 166 * This is different from the eager tcp case where we assign it the 167 * same squeue as the one given to the active connect tcp during open. 168 */ 169 void 170 tcp_fuse(tcp_t *tcp, uchar_t *iphdr, tcph_t *tcph) 171 { 172 conn_t *peer_connp, *connp = tcp->tcp_connp; 173 tcp_t *peer_tcp; 174 tcp_stack_t *tcps = tcp->tcp_tcps; 175 netstack_t *ns; 176 ip_stack_t *ipst = tcps->tcps_netstack->netstack_ip; 177 178 ASSERT(!tcp->tcp_fused); 179 ASSERT(tcp->tcp_loopback); 180 ASSERT(tcp->tcp_loopback_peer == NULL); 181 /* 182 * We need to inherit q_hiwat of the listener tcp, but we can't 183 * really use tcp_listener since we get here after sending up 184 * T_CONN_IND and tcp_wput_accept() may be called independently, 185 * at which point tcp_listener is cleared; this is why we use 186 * tcp_saved_listener. The listener itself is guaranteed to be 187 * around until tcp_accept_finish() is called on this eager -- 188 * this won't happen until we're done since we're inside the 189 * eager's perimeter now. 190 */ 191 ASSERT(tcp->tcp_saved_listener != NULL); 192 193 /* 194 * Lookup peer endpoint; search for the remote endpoint having 195 * the reversed address-port quadruplet in ESTABLISHED state, 196 * which is guaranteed to be unique in the system. Zone check 197 * is applied accordingly for loopback address, but not for 198 * local address since we want fusion to happen across Zones. 199 */ 200 if (tcp->tcp_ipversion == IPV4_VERSION) { 201 peer_connp = ipcl_conn_tcp_lookup_reversed_ipv4(connp, 202 (ipha_t *)iphdr, tcph, ipst); 203 } else { 204 peer_connp = ipcl_conn_tcp_lookup_reversed_ipv6(connp, 205 (ip6_t *)iphdr, tcph, ipst); 206 } 207 208 /* 209 * We can only proceed if peer exists, resides in the same squeue 210 * as our conn and is not raw-socket. The squeue assignment of 211 * this eager tcp was done earlier at the time of SYN processing 212 * in ip_fanout_tcp{_v6}. Note that similar squeues by itself 213 * doesn't guarantee a safe condition to fuse, hence we perform 214 * additional tests below. 215 */ 216 ASSERT(peer_connp == NULL || peer_connp != connp); 217 if (peer_connp == NULL || peer_connp->conn_sqp != connp->conn_sqp || 218 !IPCL_IS_TCP(peer_connp)) { 219 if (peer_connp != NULL) { 220 TCP_STAT(tcps, tcp_fusion_unqualified); 221 CONN_DEC_REF(peer_connp); 222 } 223 return; 224 } 225 peer_tcp = peer_connp->conn_tcp; /* active connect tcp */ 226 227 ASSERT(peer_tcp != NULL && peer_tcp != tcp && !peer_tcp->tcp_fused); 228 ASSERT(peer_tcp->tcp_loopback && peer_tcp->tcp_loopback_peer == NULL); 229 ASSERT(peer_connp->conn_sqp == connp->conn_sqp); 230 231 /* 232 * Fuse the endpoints; we perform further checks against both 233 * tcp endpoints to ensure that a fusion is allowed to happen. 234 * In particular we bail out for non-simple TCP/IP or if IPsec/ 235 * IPQoS policy/kernel SSL exists. 236 */ 237 ns = tcps->tcps_netstack; 238 ipst = ns->netstack_ip; 239 240 if (!tcp->tcp_unfusable && !peer_tcp->tcp_unfusable && 241 !tcp_loopback_needs_ip(tcp, ns) && 242 !tcp_loopback_needs_ip(peer_tcp, ns) && 243 tcp->tcp_kssl_ent == NULL && 244 !IPP_ENABLED(IPP_LOCAL_OUT|IPP_LOCAL_IN, ipst)) { 245 mblk_t *mp; 246 struct stroptions *stropt; 247 queue_t *peer_rq = peer_tcp->tcp_rq; 248 249 ASSERT(!TCP_IS_DETACHED(peer_tcp) && peer_rq != NULL); 250 ASSERT(tcp->tcp_fused_sigurg_mp == NULL); 251 ASSERT(peer_tcp->tcp_fused_sigurg_mp == NULL); 252 ASSERT(tcp->tcp_kssl_ctx == NULL); 253 254 /* 255 * We need to drain data on both endpoints during unfuse. 256 * If we need to send up SIGURG at the time of draining, 257 * we want to be sure that an mblk is readily available. 258 * This is why we pre-allocate the M_PCSIG mblks for both 259 * endpoints which will only be used during/after unfuse. 260 */ 261 if ((mp = allocb(1, BPRI_HI)) == NULL) 262 goto failed; 263 264 tcp->tcp_fused_sigurg_mp = mp; 265 266 if ((mp = allocb(1, BPRI_HI)) == NULL) 267 goto failed; 268 269 peer_tcp->tcp_fused_sigurg_mp = mp; 270 271 /* Allocate M_SETOPTS mblk */ 272 if ((mp = allocb(sizeof (*stropt), BPRI_HI)) == NULL) 273 goto failed; 274 275 /* Fuse both endpoints */ 276 peer_tcp->tcp_loopback_peer = tcp; 277 tcp->tcp_loopback_peer = peer_tcp; 278 peer_tcp->tcp_fused = tcp->tcp_fused = B_TRUE; 279 280 /* 281 * We never use regular tcp paths in fusion and should 282 * therefore clear tcp_unsent on both endpoints. Having 283 * them set to non-zero values means asking for trouble 284 * especially after unfuse, where we may end up sending 285 * through regular tcp paths which expect xmit_list and 286 * friends to be correctly setup. 287 */ 288 peer_tcp->tcp_unsent = tcp->tcp_unsent = 0; 289 290 tcp_timers_stop(tcp); 291 tcp_timers_stop(peer_tcp); 292 293 /* 294 * At this point we are a detached eager tcp and therefore 295 * don't have a queue assigned to us until accept happens. 296 * In the mean time the peer endpoint may immediately send 297 * us data as soon as fusion is finished, and we need to be 298 * able to flow control it in case it sends down huge amount 299 * of data while we're still detached. To prevent that we 300 * inherit the listener's q_hiwat value; this is temporary 301 * since we'll repeat the process in tcp_accept_finish(). 302 */ 303 (void) tcp_fuse_set_rcv_hiwat(tcp, 304 tcp->tcp_saved_listener->tcp_rq->q_hiwat); 305 306 /* 307 * Set the stream head's write offset value to zero since we 308 * won't be needing any room for TCP/IP headers; tell it to 309 * not break up the writes (this would reduce the amount of 310 * work done by kmem); and configure our receive buffer. 311 * Note that we can only do this for the active connect tcp 312 * since our eager is still detached; it will be dealt with 313 * later in tcp_accept_finish(). 314 */ 315 DB_TYPE(mp) = M_SETOPTS; 316 mp->b_wptr += sizeof (*stropt); 317 318 stropt = (struct stroptions *)mp->b_rptr; 319 stropt->so_flags = SO_MAXBLK | SO_WROFF | SO_HIWAT; 320 stropt->so_maxblk = tcp_maxpsz_set(peer_tcp, B_FALSE); 321 stropt->so_wroff = 0; 322 323 /* 324 * Record the stream head's high water mark for 325 * peer endpoint; this is used for flow-control 326 * purposes in tcp_fuse_output(). 327 */ 328 stropt->so_hiwat = tcp_fuse_set_rcv_hiwat(peer_tcp, 329 peer_rq->q_hiwat); 330 331 /* Send the options up */ 332 putnext(peer_rq, mp); 333 } else { 334 TCP_STAT(tcps, tcp_fusion_unqualified); 335 } 336 CONN_DEC_REF(peer_connp); 337 return; 338 339 failed: 340 if (tcp->tcp_fused_sigurg_mp != NULL) { 341 freeb(tcp->tcp_fused_sigurg_mp); 342 tcp->tcp_fused_sigurg_mp = NULL; 343 } 344 if (peer_tcp->tcp_fused_sigurg_mp != NULL) { 345 freeb(peer_tcp->tcp_fused_sigurg_mp); 346 peer_tcp->tcp_fused_sigurg_mp = NULL; 347 } 348 CONN_DEC_REF(peer_connp); 349 } 350 351 /* 352 * Unfuse a previously-fused pair of tcp loopback endpoints. 353 */ 354 void 355 tcp_unfuse(tcp_t *tcp) 356 { 357 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 358 359 ASSERT(tcp->tcp_fused && peer_tcp != NULL); 360 ASSERT(peer_tcp->tcp_fused && peer_tcp->tcp_loopback_peer == tcp); 361 ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp); 362 ASSERT(tcp->tcp_unsent == 0 && peer_tcp->tcp_unsent == 0); 363 ASSERT(tcp->tcp_fused_sigurg_mp != NULL); 364 ASSERT(peer_tcp->tcp_fused_sigurg_mp != NULL); 365 366 /* 367 * We disable synchronous streams, drain any queued data and 368 * clear tcp_direct_sockfs. The synchronous streams entry 369 * points will become no-ops after this point. 370 */ 371 tcp_fuse_disable_pair(tcp, B_TRUE); 372 373 /* 374 * Update th_seq and th_ack in the header template 375 */ 376 U32_TO_ABE32(tcp->tcp_snxt, tcp->tcp_tcph->th_seq); 377 U32_TO_ABE32(tcp->tcp_rnxt, tcp->tcp_tcph->th_ack); 378 U32_TO_ABE32(peer_tcp->tcp_snxt, peer_tcp->tcp_tcph->th_seq); 379 U32_TO_ABE32(peer_tcp->tcp_rnxt, peer_tcp->tcp_tcph->th_ack); 380 381 /* Unfuse the endpoints */ 382 peer_tcp->tcp_fused = tcp->tcp_fused = B_FALSE; 383 peer_tcp->tcp_loopback_peer = tcp->tcp_loopback_peer = NULL; 384 } 385 386 /* 387 * Fusion output routine for urgent data. This routine is called by 388 * tcp_fuse_output() for handling non-M_DATA mblks. 389 */ 390 void 391 tcp_fuse_output_urg(tcp_t *tcp, mblk_t *mp) 392 { 393 mblk_t *mp1; 394 struct T_exdata_ind *tei; 395 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 396 mblk_t *head, *prev_head = NULL; 397 tcp_stack_t *tcps = tcp->tcp_tcps; 398 399 ASSERT(tcp->tcp_fused); 400 ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp); 401 ASSERT(DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO); 402 ASSERT(mp->b_cont != NULL && DB_TYPE(mp->b_cont) == M_DATA); 403 ASSERT(MBLKL(mp) >= sizeof (*tei) && MBLKL(mp->b_cont) > 0); 404 405 /* 406 * Urgent data arrives in the form of T_EXDATA_REQ from above. 407 * Each occurence denotes a new urgent pointer. For each new 408 * urgent pointer we signal (SIGURG) the receiving app to indicate 409 * that it needs to go into urgent mode. This is similar to the 410 * urgent data handling in the regular tcp. We don't need to keep 411 * track of where the urgent pointer is, because each T_EXDATA_REQ 412 * "advances" the urgent pointer for us. 413 * 414 * The actual urgent data carried by T_EXDATA_REQ is then prepended 415 * by a T_EXDATA_IND before being enqueued behind any existing data 416 * destined for the receiving app. There is only a single urgent 417 * pointer (out-of-band mark) for a given tcp. If the new urgent 418 * data arrives before the receiving app reads some existing urgent 419 * data, the previous marker is lost. This behavior is emulated 420 * accordingly below, by removing any existing T_EXDATA_IND messages 421 * and essentially converting old urgent data into non-urgent. 422 */ 423 ASSERT(tcp->tcp_valid_bits & TCP_URG_VALID); 424 /* Let sender get out of urgent mode */ 425 tcp->tcp_valid_bits &= ~TCP_URG_VALID; 426 427 /* 428 * This flag indicates that a signal needs to be sent up. 429 * This flag will only get cleared once SIGURG is delivered and 430 * is not affected by the tcp_fused flag -- delivery will still 431 * happen even after an endpoint is unfused, to handle the case 432 * where the sending endpoint immediately closes/unfuses after 433 * sending urgent data and the accept is not yet finished. 434 */ 435 peer_tcp->tcp_fused_sigurg = B_TRUE; 436 437 /* Reuse T_EXDATA_REQ mblk for T_EXDATA_IND */ 438 DB_TYPE(mp) = M_PROTO; 439 tei = (struct T_exdata_ind *)mp->b_rptr; 440 tei->PRIM_type = T_EXDATA_IND; 441 tei->MORE_flag = 0; 442 mp->b_wptr = (uchar_t *)&tei[1]; 443 444 TCP_STAT(tcps, tcp_fusion_urg); 445 BUMP_MIB(&tcps->tcps_mib, tcpOutUrg); 446 447 head = peer_tcp->tcp_rcv_list; 448 while (head != NULL) { 449 /* 450 * Remove existing T_EXDATA_IND, keep the data which follows 451 * it and relink our list. Note that we don't modify the 452 * tcp_rcv_last_tail since it never points to T_EXDATA_IND. 453 */ 454 if (DB_TYPE(head) != M_DATA) { 455 mp1 = head; 456 457 ASSERT(DB_TYPE(mp1->b_cont) == M_DATA); 458 head = mp1->b_cont; 459 mp1->b_cont = NULL; 460 head->b_next = mp1->b_next; 461 mp1->b_next = NULL; 462 if (prev_head != NULL) 463 prev_head->b_next = head; 464 if (peer_tcp->tcp_rcv_list == mp1) 465 peer_tcp->tcp_rcv_list = head; 466 if (peer_tcp->tcp_rcv_last_head == mp1) 467 peer_tcp->tcp_rcv_last_head = head; 468 freeb(mp1); 469 } 470 prev_head = head; 471 head = head->b_next; 472 } 473 } 474 475 /* 476 * Fusion output routine, called by tcp_output() and tcp_wput_proto(). 477 * If we are modifying any member that can be changed outside the squeue, 478 * like tcp_flow_stopped, we need to take tcp_non_sq_lock. 479 */ 480 boolean_t 481 tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size) 482 { 483 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 484 uint_t max_unread; 485 boolean_t flow_stopped; 486 boolean_t urgent = (DB_TYPE(mp) != M_DATA); 487 mblk_t *mp1 = mp; 488 ill_t *ilp, *olp; 489 ipha_t *ipha; 490 ip6_t *ip6h; 491 tcph_t *tcph; 492 uint_t ip_hdr_len; 493 uint32_t seq; 494 uint32_t recv_size = send_size; 495 tcp_stack_t *tcps = tcp->tcp_tcps; 496 netstack_t *ns = tcps->tcps_netstack; 497 ip_stack_t *ipst = ns->netstack_ip; 498 499 ASSERT(tcp->tcp_fused); 500 ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp); 501 ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp); 502 ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO || 503 DB_TYPE(mp) == M_PCPROTO); 504 505 max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater; 506 507 /* If this connection requires IP, unfuse and use regular path */ 508 if (tcp_loopback_needs_ip(tcp, ns) || 509 tcp_loopback_needs_ip(peer_tcp, ns) || 510 IPP_ENABLED(IPP_LOCAL_OUT|IPP_LOCAL_IN, ipst)) { 511 TCP_STAT(tcps, tcp_fusion_aborted); 512 goto unfuse; 513 } 514 515 if (send_size == 0) { 516 freemsg(mp); 517 return (B_TRUE); 518 } 519 520 /* 521 * Handle urgent data; we either send up SIGURG to the peer now 522 * or do it later when we drain, in case the peer is detached 523 * or if we're short of memory for M_PCSIG mblk. 524 */ 525 if (urgent) { 526 /* 527 * We stop synchronous streams when we have urgent data 528 * queued to prevent tcp_fuse_rrw() from pulling it. If 529 * for some reasons the urgent data can't be delivered 530 * below, synchronous streams will remain stopped until 531 * someone drains the tcp_rcv_list. 532 */ 533 TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp); 534 tcp_fuse_output_urg(tcp, mp); 535 536 mp1 = mp->b_cont; 537 } 538 539 if (tcp->tcp_ipversion == IPV4_VERSION && 540 (HOOKS4_INTERESTED_LOOPBACK_IN(ipst) || 541 HOOKS4_INTERESTED_LOOPBACK_OUT(ipst)) || 542 tcp->tcp_ipversion == IPV6_VERSION && 543 (HOOKS6_INTERESTED_LOOPBACK_IN(ipst) || 544 HOOKS6_INTERESTED_LOOPBACK_OUT(ipst))) { 545 /* 546 * Build ip and tcp header to satisfy FW_HOOKS. 547 * We only build it when any hook is present. 548 */ 549 if ((mp1 = tcp_xmit_mp(tcp, mp1, tcp->tcp_mss, NULL, NULL, 550 tcp->tcp_snxt, B_TRUE, NULL, B_FALSE)) == NULL) 551 /* If tcp_xmit_mp fails, use regular path */ 552 goto unfuse; 553 554 ASSERT(peer_tcp->tcp_connp->conn_ire_cache->ire_ipif != NULL); 555 olp = peer_tcp->tcp_connp->conn_ire_cache->ire_ipif->ipif_ill; 556 /* PFHooks: LOOPBACK_OUT */ 557 if (tcp->tcp_ipversion == IPV4_VERSION) { 558 ipha = (ipha_t *)mp1->b_rptr; 559 560 DTRACE_PROBE4(ip4__loopback__out__start, 561 ill_t *, NULL, ill_t *, olp, 562 ipha_t *, ipha, mblk_t *, mp1); 563 FW_HOOKS(ipst->ips_ip4_loopback_out_event, 564 ipst->ips_ipv4firewall_loopback_out, 565 NULL, olp, ipha, mp1, mp1, ipst); 566 DTRACE_PROBE1(ip4__loopback__out__end, mblk_t *, mp1); 567 } else { 568 ip6h = (ip6_t *)mp1->b_rptr; 569 570 DTRACE_PROBE4(ip6__loopback__out__start, 571 ill_t *, NULL, ill_t *, olp, 572 ip6_t *, ip6h, mblk_t *, mp1); 573 FW_HOOKS6(ipst->ips_ip6_loopback_out_event, 574 ipst->ips_ipv6firewall_loopback_out, 575 NULL, olp, ip6h, mp1, mp1, ipst); 576 DTRACE_PROBE1(ip6__loopback__out__end, mblk_t *, mp1); 577 } 578 if (mp1 == NULL) 579 goto unfuse; 580 581 582 /* PFHooks: LOOPBACK_IN */ 583 ASSERT(tcp->tcp_connp->conn_ire_cache->ire_ipif != NULL); 584 ilp = tcp->tcp_connp->conn_ire_cache->ire_ipif->ipif_ill; 585 586 if (tcp->tcp_ipversion == IPV4_VERSION) { 587 DTRACE_PROBE4(ip4__loopback__in__start, 588 ill_t *, ilp, ill_t *, NULL, 589 ipha_t *, ipha, mblk_t *, mp1); 590 FW_HOOKS(ipst->ips_ip4_loopback_in_event, 591 ipst->ips_ipv4firewall_loopback_in, 592 ilp, NULL, ipha, mp1, mp1, ipst); 593 DTRACE_PROBE1(ip4__loopback__in__end, mblk_t *, mp1); 594 if (mp1 == NULL) 595 goto unfuse; 596 597 ip_hdr_len = IPH_HDR_LENGTH(ipha); 598 } else { 599 DTRACE_PROBE4(ip6__loopback__in__start, 600 ill_t *, ilp, ill_t *, NULL, 601 ip6_t *, ip6h, mblk_t *, mp1); 602 FW_HOOKS6(ipst->ips_ip6_loopback_in_event, 603 ipst->ips_ipv6firewall_loopback_in, 604 ilp, NULL, ip6h, mp1, mp1, ipst); 605 DTRACE_PROBE1(ip6__loopback__in__end, mblk_t *, mp1); 606 if (mp1 == NULL) 607 goto unfuse; 608 609 ip_hdr_len = ip_hdr_length_v6(mp1, ip6h); 610 } 611 612 /* Data length might be changed by FW_HOOKS */ 613 tcph = (tcph_t *)&mp1->b_rptr[ip_hdr_len]; 614 seq = ABE32_TO_U32(tcph->th_seq); 615 recv_size += seq - tcp->tcp_snxt; 616 617 /* 618 * The message duplicated by tcp_xmit_mp is freed. 619 * Note: the original message passed in remains unchanged. 620 */ 621 freemsg(mp1); 622 } 623 624 mutex_enter(&peer_tcp->tcp_non_sq_lock); 625 /* 626 * Wake up and signal the peer; it is okay to do this before 627 * enqueueing because we are holding the lock. One of the 628 * advantages of synchronous streams is the ability for us to 629 * find out when the application performs a read on the socket, 630 * by way of tcp_fuse_rrw() entry point being called. Every 631 * data that gets enqueued onto the receiver is treated as if 632 * it has arrived at the receiving endpoint, thus generating 633 * SIGPOLL/SIGIO for asynchronous socket just as in the strrput() 634 * case. However, we only wake up the application when necessary, 635 * i.e. during the first enqueue. When tcp_fuse_rrw() is called 636 * it will send everything upstream. 637 */ 638 if (peer_tcp->tcp_direct_sockfs && !urgent && 639 !TCP_IS_DETACHED(peer_tcp)) { 640 if (peer_tcp->tcp_rcv_list == NULL) 641 STR_WAKEUP_SET(STREAM(peer_tcp->tcp_rq)); 642 /* Update poll events and send SIGPOLL/SIGIO if necessary */ 643 STR_SENDSIG(STREAM(peer_tcp->tcp_rq)); 644 } 645 646 /* 647 * Enqueue data into the peer's receive list; we may or may not 648 * drain the contents depending on the conditions below. 649 */ 650 tcp_rcv_enqueue(peer_tcp, mp, recv_size); 651 652 /* In case it wrapped around and also to keep it constant */ 653 peer_tcp->tcp_rwnd += recv_size; 654 655 /* 656 * Exercise flow-control when needed; we will get back-enabled 657 * in either tcp_accept_finish(), tcp_unfuse(), or tcp_fuse_rrw(). 658 * If tcp_direct_sockfs is on or if the peer endpoint is detached, 659 * we emulate streams flow control by checking the peer's queue 660 * size and high water mark; otherwise we simply use canputnext() 661 * to decide if we need to stop our flow. 662 * 663 * The outstanding unread data block check does not apply for a 664 * detached receiver; this is to avoid unnecessary blocking of the 665 * sender while the accept is currently in progress and is quite 666 * similar to the regular tcp. 667 */ 668 if (TCP_IS_DETACHED(peer_tcp) || max_unread == 0) 669 max_unread = UINT_MAX; 670 671 /* 672 * Since we are accessing our tcp_flow_stopped and might modify it, 673 * we need to take tcp->tcp_non_sq_lock. The lock for the highest 674 * address is held first. Dropping peer_tcp->tcp_non_sq_lock should 675 * not be an issue here since we are within the squeue and the peer 676 * won't disappear. 677 */ 678 if (tcp > peer_tcp) { 679 mutex_exit(&peer_tcp->tcp_non_sq_lock); 680 mutex_enter(&tcp->tcp_non_sq_lock); 681 mutex_enter(&peer_tcp->tcp_non_sq_lock); 682 } else { 683 mutex_enter(&tcp->tcp_non_sq_lock); 684 } 685 flow_stopped = tcp->tcp_flow_stopped; 686 if (!flow_stopped && 687 (((peer_tcp->tcp_direct_sockfs || TCP_IS_DETACHED(peer_tcp)) && 688 (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater || 689 ++peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) || 690 (!peer_tcp->tcp_direct_sockfs && 691 !TCP_IS_DETACHED(peer_tcp) && !canputnext(peer_tcp->tcp_rq)))) { 692 tcp_setqfull(tcp); 693 flow_stopped = B_TRUE; 694 TCP_STAT(tcps, tcp_fusion_flowctl); 695 DTRACE_PROBE4(tcp__fuse__output__flowctl, tcp_t *, tcp, 696 uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt, 697 uint_t, peer_tcp->tcp_fuse_rcv_unread_cnt); 698 } else if (flow_stopped && 699 TCP_UNSENT_BYTES(tcp) <= tcp->tcp_xmit_lowater) { 700 tcp_clrqfull(tcp); 701 flow_stopped = B_FALSE; 702 } 703 mutex_exit(&tcp->tcp_non_sq_lock); 704 ipst->ips_loopback_packets++; 705 tcp->tcp_last_sent_len = send_size; 706 707 /* Need to adjust the following SNMP MIB-related variables */ 708 tcp->tcp_snxt += send_size; 709 tcp->tcp_suna = tcp->tcp_snxt; 710 peer_tcp->tcp_rnxt += recv_size; 711 peer_tcp->tcp_rack = peer_tcp->tcp_rnxt; 712 713 BUMP_MIB(&tcps->tcps_mib, tcpOutDataSegs); 714 UPDATE_MIB(&tcps->tcps_mib, tcpOutDataBytes, send_size); 715 716 BUMP_MIB(&tcps->tcps_mib, tcpInSegs); 717 BUMP_MIB(&tcps->tcps_mib, tcpInDataInorderSegs); 718 UPDATE_MIB(&tcps->tcps_mib, tcpInDataInorderBytes, send_size); 719 720 BUMP_LOCAL(tcp->tcp_obsegs); 721 BUMP_LOCAL(peer_tcp->tcp_ibsegs); 722 723 mutex_exit(&peer_tcp->tcp_non_sq_lock); 724 725 DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size); 726 727 if (!TCP_IS_DETACHED(peer_tcp)) { 728 /* 729 * Drain the peer's receive queue it has urgent data or if 730 * we're not flow-controlled. There is no need for draining 731 * normal data when tcp_direct_sockfs is on because the peer 732 * will pull the data via tcp_fuse_rrw(). 733 */ 734 if (urgent || (!flow_stopped && !peer_tcp->tcp_direct_sockfs)) { 735 ASSERT(peer_tcp->tcp_rcv_list != NULL); 736 /* 737 * For TLI-based streams, a thread in tcp_accept_swap() 738 * can race with us. That thread will ensure that the 739 * correct peer_tcp->tcp_rq is globally visible before 740 * peer_tcp->tcp_detached is visible as clear, but we 741 * must also ensure that the load of tcp_rq cannot be 742 * reordered to be before the tcp_detached check. 743 */ 744 membar_consumer(); 745 (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp, 746 NULL); 747 /* 748 * If synchronous streams was stopped above due 749 * to the presence of urgent data, re-enable it. 750 */ 751 if (urgent) 752 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp); 753 } 754 } 755 return (B_TRUE); 756 unfuse: 757 tcp_unfuse(tcp); 758 return (B_FALSE); 759 } 760 761 /* 762 * This routine gets called to deliver data upstream on a fused or 763 * previously fused tcp loopback endpoint; the latter happens only 764 * when there is a pending SIGURG signal plus urgent data that can't 765 * be sent upstream in the past. 766 */ 767 boolean_t 768 tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp) 769 { 770 mblk_t *mp; 771 #ifdef DEBUG 772 uint_t cnt = 0; 773 #endif 774 tcp_stack_t *tcps = tcp->tcp_tcps; 775 776 ASSERT(tcp->tcp_loopback); 777 ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg); 778 ASSERT(!tcp->tcp_fused || tcp->tcp_loopback_peer != NULL); 779 ASSERT(sigurg_mpp != NULL || tcp->tcp_fused); 780 781 /* No need for the push timer now, in case it was scheduled */ 782 if (tcp->tcp_push_tid != 0) { 783 (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid); 784 tcp->tcp_push_tid = 0; 785 } 786 /* 787 * If there's urgent data sitting in receive list and we didn't 788 * get a chance to send up a SIGURG signal, make sure we send 789 * it first before draining in order to ensure that SIOCATMARK 790 * works properly. 791 */ 792 if (tcp->tcp_fused_sigurg) { 793 /* 794 * sigurg_mpp is normally NULL, i.e. when we're still 795 * fused and didn't get here because of tcp_unfuse(). 796 * In this case try hard to allocate the M_PCSIG mblk. 797 */ 798 if (sigurg_mpp == NULL && 799 (mp = allocb(1, BPRI_HI)) == NULL && 800 (mp = allocb_tryhard(1)) == NULL) { 801 /* Alloc failed; try again next time */ 802 tcp->tcp_push_tid = TCP_TIMER(tcp, tcp_push_timer, 803 MSEC_TO_TICK(tcps->tcps_push_timer_interval)); 804 return (B_TRUE); 805 } else if (sigurg_mpp != NULL) { 806 /* 807 * Use the supplied M_PCSIG mblk; it means we're 808 * either unfused or in the process of unfusing, 809 * and the drain must happen now. 810 */ 811 mp = *sigurg_mpp; 812 *sigurg_mpp = NULL; 813 } 814 ASSERT(mp != NULL); 815 816 tcp->tcp_fused_sigurg = B_FALSE; 817 /* Send up the signal */ 818 DB_TYPE(mp) = M_PCSIG; 819 *mp->b_wptr++ = (uchar_t)SIGURG; 820 putnext(q, mp); 821 /* 822 * Let the regular tcp_rcv_drain() path handle 823 * draining the data if we're no longer fused. 824 */ 825 if (!tcp->tcp_fused) 826 return (B_FALSE); 827 } 828 829 /* 830 * In the synchronous streams case, we generate SIGPOLL/SIGIO for 831 * each M_DATA that gets enqueued onto the receiver. At this point 832 * we are about to drain any queued data via putnext(). In order 833 * to avoid extraneous signal generation from strrput(), we set 834 * STRGETINPROG flag at the stream head prior to the draining and 835 * restore it afterwards. This masks out signal generation only 836 * for M_DATA messages and does not affect urgent data. 837 */ 838 if (tcp->tcp_direct_sockfs) 839 strrput_sig(q, B_FALSE); 840 841 /* Drain the data */ 842 while ((mp = tcp->tcp_rcv_list) != NULL) { 843 tcp->tcp_rcv_list = mp->b_next; 844 mp->b_next = NULL; 845 #ifdef DEBUG 846 cnt += msgdsize(mp); 847 #endif 848 putnext(q, mp); 849 TCP_STAT(tcps, tcp_fusion_putnext); 850 } 851 852 if (tcp->tcp_direct_sockfs) 853 strrput_sig(q, B_TRUE); 854 855 ASSERT(cnt == tcp->tcp_rcv_cnt); 856 tcp->tcp_rcv_last_head = NULL; 857 tcp->tcp_rcv_last_tail = NULL; 858 tcp->tcp_rcv_cnt = 0; 859 tcp->tcp_fuse_rcv_unread_cnt = 0; 860 tcp->tcp_rwnd = q->q_hiwat; 861 862 return (B_TRUE); 863 } 864 865 /* 866 * Synchronous stream entry point for sockfs to retrieve 867 * data directly from tcp_rcv_list. 868 * tcp_fuse_rrw() might end up modifying the peer's tcp_flow_stopped, 869 * for which it must take the tcp_non_sq_lock of the peer as well 870 * making any change. The order of taking the locks is based on 871 * the TCP pointer itself. Before we get the peer we need to take 872 * our tcp_non_sq_lock so that the peer doesn't disappear. However, 873 * we cannot drop the lock if we have to grab the peer's lock (because 874 * of ordering), since the peer might disappear in the interim. So, 875 * we take our tcp_non_sq_lock, get the peer, increment the ref on the 876 * peer's conn, drop all the locks and then take the tcp_non_sq_lock in the 877 * desired order. Incrementing the conn ref on the peer means that the 878 * peer won't disappear when we drop our tcp_non_sq_lock. 879 */ 880 int 881 tcp_fuse_rrw(queue_t *q, struiod_t *dp) 882 { 883 tcp_t *tcp = Q_TO_CONN(q)->conn_tcp; 884 mblk_t *mp; 885 tcp_t *peer_tcp; 886 tcp_stack_t *tcps = tcp->tcp_tcps; 887 888 mutex_enter(&tcp->tcp_non_sq_lock); 889 890 /* 891 * If tcp_fuse_syncstr_plugged is set, then another thread is moving 892 * the underlying data to the stream head. We need to wait until it's 893 * done, then return EBUSY so that strget() will dequeue data from the 894 * stream head to ensure data is drained in-order. 895 */ 896 plugged: 897 if (tcp->tcp_fuse_syncstr_plugged) { 898 do { 899 cv_wait(&tcp->tcp_fuse_plugcv, &tcp->tcp_non_sq_lock); 900 } while (tcp->tcp_fuse_syncstr_plugged); 901 902 mutex_exit(&tcp->tcp_non_sq_lock); 903 TCP_STAT(tcps, tcp_fusion_rrw_plugged); 904 TCP_STAT(tcps, tcp_fusion_rrw_busy); 905 return (EBUSY); 906 } 907 908 peer_tcp = tcp->tcp_loopback_peer; 909 910 /* 911 * If someone had turned off tcp_direct_sockfs or if synchronous 912 * streams is stopped, we return EBUSY. This causes strget() to 913 * dequeue data from the stream head instead. 914 */ 915 if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) { 916 mutex_exit(&tcp->tcp_non_sq_lock); 917 TCP_STAT(tcps, tcp_fusion_rrw_busy); 918 return (EBUSY); 919 } 920 921 /* 922 * Grab lock in order. The highest addressed tcp is locked first. 923 * We don't do this within the tcp_rcv_list check since if we 924 * have to drop the lock, for ordering, then the tcp_rcv_list 925 * could change. 926 */ 927 if (peer_tcp > tcp) { 928 CONN_INC_REF(peer_tcp->tcp_connp); 929 mutex_exit(&tcp->tcp_non_sq_lock); 930 mutex_enter(&peer_tcp->tcp_non_sq_lock); 931 mutex_enter(&tcp->tcp_non_sq_lock); 932 CONN_DEC_REF(peer_tcp->tcp_connp); 933 /* This might have changed in the interim */ 934 if (tcp->tcp_fuse_syncstr_plugged) { 935 mutex_exit(&peer_tcp->tcp_non_sq_lock); 936 goto plugged; 937 } 938 } else { 939 mutex_enter(&peer_tcp->tcp_non_sq_lock); 940 } 941 942 if ((mp = tcp->tcp_rcv_list) != NULL) { 943 944 DTRACE_PROBE3(tcp__fuse__rrw, tcp_t *, tcp, 945 uint32_t, tcp->tcp_rcv_cnt, ssize_t, dp->d_uio.uio_resid); 946 947 tcp->tcp_rcv_list = NULL; 948 TCP_STAT(tcps, tcp_fusion_rrw_msgcnt); 949 950 /* 951 * At this point nothing should be left in tcp_rcv_list. 952 * The only possible case where we would have a chain of 953 * b_next-linked messages is urgent data, but we wouldn't 954 * be here if that's true since urgent data is delivered 955 * via putnext() and synchronous streams is stopped until 956 * tcp_fuse_rcv_drain() is finished. 957 */ 958 ASSERT(DB_TYPE(mp) == M_DATA && mp->b_next == NULL); 959 960 tcp->tcp_rcv_last_head = NULL; 961 tcp->tcp_rcv_last_tail = NULL; 962 tcp->tcp_rcv_cnt = 0; 963 tcp->tcp_fuse_rcv_unread_cnt = 0; 964 965 if (peer_tcp->tcp_flow_stopped) { 966 tcp_clrqfull(peer_tcp); 967 TCP_STAT(tcps, tcp_fusion_backenabled); 968 } 969 } 970 mutex_exit(&peer_tcp->tcp_non_sq_lock); 971 /* 972 * Either we just dequeued everything or we get here from sockfs 973 * and have nothing to return; in this case clear RSLEEP. 974 */ 975 ASSERT(tcp->tcp_rcv_last_head == NULL); 976 ASSERT(tcp->tcp_rcv_last_tail == NULL); 977 ASSERT(tcp->tcp_rcv_cnt == 0); 978 ASSERT(tcp->tcp_fuse_rcv_unread_cnt == 0); 979 STR_WAKEUP_CLEAR(STREAM(q)); 980 981 mutex_exit(&tcp->tcp_non_sq_lock); 982 dp->d_mp = mp; 983 return (0); 984 } 985 986 /* 987 * Synchronous stream entry point used by certain ioctls to retrieve 988 * information about or peek into the tcp_rcv_list. 989 */ 990 int 991 tcp_fuse_rinfop(queue_t *q, infod_t *dp) 992 { 993 tcp_t *tcp = Q_TO_CONN(q)->conn_tcp; 994 mblk_t *mp; 995 uint_t cmd = dp->d_cmd; 996 int res = 0; 997 int error = 0; 998 struct stdata *stp = STREAM(q); 999 1000 mutex_enter(&tcp->tcp_non_sq_lock); 1001 /* If shutdown on read has happened, return nothing */ 1002 mutex_enter(&stp->sd_lock); 1003 if (stp->sd_flag & STREOF) { 1004 mutex_exit(&stp->sd_lock); 1005 goto done; 1006 } 1007 mutex_exit(&stp->sd_lock); 1008 1009 /* 1010 * It is OK not to return an answer if tcp_rcv_list is 1011 * currently not accessible. 1012 */ 1013 if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped || 1014 tcp->tcp_fuse_syncstr_plugged || (mp = tcp->tcp_rcv_list) == NULL) 1015 goto done; 1016 1017 if (cmd & INFOD_COUNT) { 1018 /* 1019 * We have at least one message and 1020 * could return only one at a time. 1021 */ 1022 dp->d_count++; 1023 res |= INFOD_COUNT; 1024 } 1025 if (cmd & INFOD_BYTES) { 1026 /* 1027 * Return size of all data messages. 1028 */ 1029 dp->d_bytes += tcp->tcp_rcv_cnt; 1030 res |= INFOD_BYTES; 1031 } 1032 if (cmd & INFOD_FIRSTBYTES) { 1033 /* 1034 * Return size of first data message. 1035 */ 1036 dp->d_bytes = msgdsize(mp); 1037 res |= INFOD_FIRSTBYTES; 1038 dp->d_cmd &= ~INFOD_FIRSTBYTES; 1039 } 1040 if (cmd & INFOD_COPYOUT) { 1041 mblk_t *mp1; 1042 int n; 1043 1044 if (DB_TYPE(mp) == M_DATA) { 1045 mp1 = mp; 1046 } else { 1047 mp1 = mp->b_cont; 1048 ASSERT(mp1 != NULL); 1049 } 1050 1051 /* 1052 * Return data contents of first message. 1053 */ 1054 ASSERT(DB_TYPE(mp1) == M_DATA); 1055 while (mp1 != NULL && dp->d_uiop->uio_resid > 0) { 1056 n = MIN(dp->d_uiop->uio_resid, MBLKL(mp1)); 1057 if (n != 0 && (error = uiomove((char *)mp1->b_rptr, n, 1058 UIO_READ, dp->d_uiop)) != 0) { 1059 goto done; 1060 } 1061 mp1 = mp1->b_cont; 1062 } 1063 res |= INFOD_COPYOUT; 1064 dp->d_cmd &= ~INFOD_COPYOUT; 1065 } 1066 done: 1067 mutex_exit(&tcp->tcp_non_sq_lock); 1068 1069 dp->d_res |= res; 1070 1071 return (error); 1072 } 1073 1074 /* 1075 * Enable synchronous streams on a fused tcp loopback endpoint. 1076 */ 1077 static void 1078 tcp_fuse_syncstr_enable(tcp_t *tcp) 1079 { 1080 queue_t *rq = tcp->tcp_rq; 1081 struct stdata *stp = STREAM(rq); 1082 1083 /* We can only enable synchronous streams for sockfs mode */ 1084 tcp->tcp_direct_sockfs = tcp->tcp_issocket && do_tcp_direct_sockfs; 1085 1086 if (!tcp->tcp_direct_sockfs) 1087 return; 1088 1089 mutex_enter(&stp->sd_lock); 1090 mutex_enter(QLOCK(rq)); 1091 1092 /* 1093 * We replace our q_qinfo with one that has the qi_rwp entry point. 1094 * Clear SR_SIGALLDATA because we generate the equivalent signal(s) 1095 * for every enqueued data in tcp_fuse_output(). 1096 */ 1097 rq->q_qinfo = &tcp_loopback_rinit; 1098 rq->q_struiot = tcp_loopback_rinit.qi_struiot; 1099 stp->sd_struiordq = rq; 1100 stp->sd_rput_opt &= ~SR_SIGALLDATA; 1101 1102 mutex_exit(QLOCK(rq)); 1103 mutex_exit(&stp->sd_lock); 1104 } 1105 1106 /* 1107 * Disable synchronous streams on a fused tcp loopback endpoint. 1108 */ 1109 static void 1110 tcp_fuse_syncstr_disable(tcp_t *tcp) 1111 { 1112 queue_t *rq = tcp->tcp_rq; 1113 struct stdata *stp = STREAM(rq); 1114 1115 if (!tcp->tcp_direct_sockfs) 1116 return; 1117 1118 mutex_enter(&stp->sd_lock); 1119 mutex_enter(QLOCK(rq)); 1120 1121 /* 1122 * Reset q_qinfo to point to the default tcp entry points. 1123 * Also restore SR_SIGALLDATA so that strrput() can generate 1124 * the signals again for future M_DATA messages. 1125 */ 1126 rq->q_qinfo = &tcp_rinit; 1127 rq->q_struiot = tcp_rinit.qi_struiot; 1128 stp->sd_struiordq = NULL; 1129 stp->sd_rput_opt |= SR_SIGALLDATA; 1130 tcp->tcp_direct_sockfs = B_FALSE; 1131 1132 mutex_exit(QLOCK(rq)); 1133 mutex_exit(&stp->sd_lock); 1134 } 1135 1136 /* 1137 * Enable synchronous streams on a pair of fused tcp endpoints. 1138 */ 1139 void 1140 tcp_fuse_syncstr_enable_pair(tcp_t *tcp) 1141 { 1142 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 1143 1144 ASSERT(tcp->tcp_fused); 1145 ASSERT(peer_tcp != NULL); 1146 1147 tcp_fuse_syncstr_enable(tcp); 1148 tcp_fuse_syncstr_enable(peer_tcp); 1149 } 1150 1151 /* 1152 * Allow or disallow signals to be generated by strrput(). 1153 */ 1154 static void 1155 strrput_sig(queue_t *q, boolean_t on) 1156 { 1157 struct stdata *stp = STREAM(q); 1158 1159 mutex_enter(&stp->sd_lock); 1160 if (on) 1161 stp->sd_flag &= ~STRGETINPROG; 1162 else 1163 stp->sd_flag |= STRGETINPROG; 1164 mutex_exit(&stp->sd_lock); 1165 } 1166 1167 /* 1168 * Disable synchronous streams on a pair of fused tcp endpoints and drain 1169 * any queued data; called either during unfuse or upon transitioning from 1170 * a socket to a stream endpoint due to _SIOCSOCKFALLBACK. 1171 */ 1172 void 1173 tcp_fuse_disable_pair(tcp_t *tcp, boolean_t unfusing) 1174 { 1175 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 1176 tcp_stack_t *tcps = tcp->tcp_tcps; 1177 1178 ASSERT(tcp->tcp_fused); 1179 ASSERT(peer_tcp != NULL); 1180 1181 /* 1182 * Force any tcp_fuse_rrw() calls to block until we've moved the data 1183 * onto the stream head. 1184 */ 1185 TCP_FUSE_SYNCSTR_PLUG_DRAIN(tcp); 1186 TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp); 1187 1188 /* 1189 * Drain any pending data; the detached check is needed because 1190 * we may be called as a result of a tcp_unfuse() triggered by 1191 * tcp_fuse_output(). Note that in case of a detached tcp, the 1192 * draining will happen later after the tcp is unfused. For non- 1193 * urgent data, this can be handled by the regular tcp_rcv_drain(). 1194 * If we have urgent data sitting in the receive list, we will 1195 * need to send up a SIGURG signal first before draining the data. 1196 * All of these will be handled by the code in tcp_fuse_rcv_drain() 1197 * when called from tcp_rcv_drain(). 1198 */ 1199 if (!TCP_IS_DETACHED(tcp)) { 1200 (void) tcp_fuse_rcv_drain(tcp->tcp_rq, tcp, 1201 (unfusing ? &tcp->tcp_fused_sigurg_mp : NULL)); 1202 } 1203 if (!TCP_IS_DETACHED(peer_tcp)) { 1204 (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp, 1205 (unfusing ? &peer_tcp->tcp_fused_sigurg_mp : NULL)); 1206 } 1207 1208 /* 1209 * Make all current and future tcp_fuse_rrw() calls fail with EBUSY. 1210 * To ensure threads don't sneak past the checks in tcp_fuse_rrw(), 1211 * a given stream must be stopped prior to being unplugged (but the 1212 * ordering of operations between the streams is unimportant). 1213 */ 1214 TCP_FUSE_SYNCSTR_STOP(tcp); 1215 TCP_FUSE_SYNCSTR_STOP(peer_tcp); 1216 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(tcp); 1217 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp); 1218 1219 /* Lift up any flow-control conditions */ 1220 if (tcp->tcp_flow_stopped) { 1221 tcp_clrqfull(tcp); 1222 TCP_STAT(tcps, tcp_fusion_backenabled); 1223 } 1224 if (peer_tcp->tcp_flow_stopped) { 1225 tcp_clrqfull(peer_tcp); 1226 TCP_STAT(tcps, tcp_fusion_backenabled); 1227 } 1228 1229 /* Disable synchronous streams */ 1230 tcp_fuse_syncstr_disable(tcp); 1231 tcp_fuse_syncstr_disable(peer_tcp); 1232 } 1233 1234 /* 1235 * Calculate the size of receive buffer for a fused tcp endpoint. 1236 */ 1237 size_t 1238 tcp_fuse_set_rcv_hiwat(tcp_t *tcp, size_t rwnd) 1239 { 1240 tcp_stack_t *tcps = tcp->tcp_tcps; 1241 1242 ASSERT(tcp->tcp_fused); 1243 1244 /* Ensure that value is within the maximum upper bound */ 1245 if (rwnd > tcps->tcps_max_buf) 1246 rwnd = tcps->tcps_max_buf; 1247 1248 /* Obey the absolute minimum tcp receive high water mark */ 1249 if (rwnd < tcps->tcps_sth_rcv_hiwat) 1250 rwnd = tcps->tcps_sth_rcv_hiwat; 1251 1252 /* 1253 * Round up to system page size in case SO_RCVBUF is modified 1254 * after SO_SNDBUF; the latter is also similarly rounded up. 1255 */ 1256 rwnd = P2ROUNDUP_TYPED(rwnd, PAGESIZE, size_t); 1257 tcp->tcp_fuse_rcv_hiwater = rwnd; 1258 return (rwnd); 1259 } 1260 1261 /* 1262 * Calculate the maximum outstanding unread data block for a fused tcp endpoint. 1263 */ 1264 int 1265 tcp_fuse_maxpsz_set(tcp_t *tcp) 1266 { 1267 tcp_t *peer_tcp = tcp->tcp_loopback_peer; 1268 uint_t sndbuf = tcp->tcp_xmit_hiwater; 1269 uint_t maxpsz = sndbuf; 1270 1271 ASSERT(tcp->tcp_fused); 1272 ASSERT(peer_tcp != NULL); 1273 ASSERT(peer_tcp->tcp_fuse_rcv_hiwater != 0); 1274 /* 1275 * In the fused loopback case, we want the stream head to split 1276 * up larger writes into smaller chunks for a more accurate flow- 1277 * control accounting. Our maxpsz is half of the sender's send 1278 * buffer or the receiver's receive buffer, whichever is smaller. 1279 * We round up the buffer to system page size due to the lack of 1280 * TCP MSS concept in Fusion. 1281 */ 1282 if (maxpsz > peer_tcp->tcp_fuse_rcv_hiwater) 1283 maxpsz = peer_tcp->tcp_fuse_rcv_hiwater; 1284 maxpsz = P2ROUNDUP_TYPED(maxpsz, PAGESIZE, uint_t) >> 1; 1285 1286 /* 1287 * Calculate the peer's limit for the number of outstanding unread 1288 * data block. This is the amount of data blocks that are allowed 1289 * to reside in the receiver's queue before the sender gets flow 1290 * controlled. It is used only in the synchronous streams mode as 1291 * a way to throttle the sender when it performs consecutive writes 1292 * faster than can be read. The value is derived from SO_SNDBUF in 1293 * order to give the sender some control; we divide it with a large 1294 * value (16KB) to produce a fairly low initial limit. 1295 */ 1296 if (tcp_fusion_rcv_unread_min == 0) { 1297 /* A value of 0 means that we disable the check */ 1298 peer_tcp->tcp_fuse_rcv_unread_hiwater = 0; 1299 } else { 1300 peer_tcp->tcp_fuse_rcv_unread_hiwater = 1301 MAX(sndbuf >> 14, tcp_fusion_rcv_unread_min); 1302 } 1303 return (maxpsz); 1304 } 1305