xref: /titanic_52/usr/src/uts/common/inet/tcp/tcp_fusion.c (revision 3e4f1187bdb4da8d8dea21f85a0bc51bd8c5a35e)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/types.h>
27 #include <sys/stream.h>
28 #include <sys/strsun.h>
29 #include <sys/strsubr.h>
30 #include <sys/debug.h>
31 #include <sys/sdt.h>
32 #include <sys/cmn_err.h>
33 #include <sys/tihdr.h>
34 
35 #include <inet/common.h>
36 #include <inet/optcom.h>
37 #include <inet/ip.h>
38 #include <inet/ip_if.h>
39 #include <inet/ip_impl.h>
40 #include <inet/tcp.h>
41 #include <inet/tcp_impl.h>
42 #include <inet/ipsec_impl.h>
43 #include <inet/ipclassifier.h>
44 #include <inet/ipp_common.h>
45 #include <inet/ip_if.h>
46 
47 /*
48  * This file implements TCP fusion - a protocol-less data path for TCP
49  * loopback connections.  The fusion of two local TCP endpoints occurs
50  * at connection establishment time.  Various conditions (see details
51  * in tcp_fuse()) need to be met for fusion to be successful.  If it
52  * fails, we fall back to the regular TCP data path; if it succeeds,
53  * both endpoints proceed to use tcp_fuse_output() as the transmit path.
54  * tcp_fuse_output() enqueues application data directly onto the peer's
55  * receive queue; no protocol processing is involved.
56  *
57  * Sychronization is handled by squeue and the mutex tcp_non_sq_lock.
58  * One of the requirements for fusion to succeed is that both endpoints
59  * need to be using the same squeue.  This ensures that neither side
60  * can disappear while the other side is still sending data. Flow
61  * control information is manipulated outside the squeue, so the
62  * tcp_non_sq_lock must be held when touching tcp_flow_stopped.
63  */
64 
65 /*
66  * Setting this to false means we disable fusion altogether and
67  * loopback connections would go through the protocol paths.
68  */
69 boolean_t do_tcp_fusion = B_TRUE;
70 
71 /*
72  * This routine gets called by the eager tcp upon changing state from
73  * SYN_RCVD to ESTABLISHED.  It fuses a direct path between itself
74  * and the active connect tcp such that the regular tcp processings
75  * may be bypassed under allowable circumstances.  Because the fusion
76  * requires both endpoints to be in the same squeue, it does not work
77  * for simultaneous active connects because there is no easy way to
78  * switch from one squeue to another once the connection is created.
79  * This is different from the eager tcp case where we assign it the
80  * same squeue as the one given to the active connect tcp during open.
81  */
82 void
83 tcp_fuse(tcp_t *tcp, uchar_t *iphdr, tcpha_t *tcpha)
84 {
85 	conn_t		*peer_connp, *connp = tcp->tcp_connp;
86 	tcp_t		*peer_tcp;
87 	tcp_stack_t	*tcps = tcp->tcp_tcps;
88 	netstack_t	*ns;
89 	ip_stack_t	*ipst = tcps->tcps_netstack->netstack_ip;
90 
91 	ASSERT(!tcp->tcp_fused);
92 	ASSERT(tcp->tcp_loopback);
93 	ASSERT(tcp->tcp_loopback_peer == NULL);
94 	/*
95 	 * We need to inherit conn_rcvbuf of the listener tcp,
96 	 * but we can't really use tcp_listener since we get here after
97 	 * sending up T_CONN_IND and tcp_tli_accept() may be called
98 	 * independently, at which point tcp_listener is cleared;
99 	 * this is why we use tcp_saved_listener. The listener itself
100 	 * is guaranteed to be around until tcp_accept_finish() is called
101 	 * on this eager -- this won't happen until we're done since we're
102 	 * inside the eager's perimeter now.
103 	 */
104 	ASSERT(tcp->tcp_saved_listener != NULL);
105 	/*
106 	 * Lookup peer endpoint; search for the remote endpoint having
107 	 * the reversed address-port quadruplet in ESTABLISHED state,
108 	 * which is guaranteed to be unique in the system.  Zone check
109 	 * is applied accordingly for loopback address, but not for
110 	 * local address since we want fusion to happen across Zones.
111 	 */
112 	if (connp->conn_ipversion == IPV4_VERSION) {
113 		peer_connp = ipcl_conn_tcp_lookup_reversed_ipv4(connp,
114 		    (ipha_t *)iphdr, tcpha, ipst);
115 	} else {
116 		peer_connp = ipcl_conn_tcp_lookup_reversed_ipv6(connp,
117 		    (ip6_t *)iphdr, tcpha, ipst);
118 	}
119 
120 	/*
121 	 * We can only proceed if peer exists, resides in the same squeue
122 	 * as our conn and is not raw-socket. We also restrict fusion to
123 	 * endpoints of the same type (STREAMS or non-STREAMS). The squeue
124 	 * assignment of this eager tcp was done earlier at the time of SYN
125 	 * processing in ip_fanout_tcp{_v6}.  Note that similar squeues by
126 	 * itself doesn't guarantee a safe condition to fuse, hence we perform
127 	 * additional tests below.
128 	 */
129 	ASSERT(peer_connp == NULL || peer_connp != connp);
130 	if (peer_connp == NULL || peer_connp->conn_sqp != connp->conn_sqp ||
131 	    !IPCL_IS_TCP(peer_connp) ||
132 	    IPCL_IS_NONSTR(connp) != IPCL_IS_NONSTR(peer_connp)) {
133 		if (peer_connp != NULL) {
134 			TCP_STAT(tcps, tcp_fusion_unqualified);
135 			CONN_DEC_REF(peer_connp);
136 		}
137 		return;
138 	}
139 	peer_tcp = peer_connp->conn_tcp;	/* active connect tcp */
140 
141 	ASSERT(peer_tcp != NULL && peer_tcp != tcp && !peer_tcp->tcp_fused);
142 	ASSERT(peer_tcp->tcp_loopback_peer == NULL);
143 	ASSERT(peer_connp->conn_sqp == connp->conn_sqp);
144 
145 	/*
146 	 * Due to IRE changes the peer and us might not agree on tcp_loopback.
147 	 * We bail in that case.
148 	 */
149 	if (!peer_tcp->tcp_loopback) {
150 		TCP_STAT(tcps, tcp_fusion_unqualified);
151 		CONN_DEC_REF(peer_connp);
152 		return;
153 	}
154 	/*
155 	 * Fuse the endpoints; we perform further checks against both
156 	 * tcp endpoints to ensure that a fusion is allowed to happen.
157 	 * In particular we bail out if kernel SSL exists.
158 	 */
159 	ns = tcps->tcps_netstack;
160 	ipst = ns->netstack_ip;
161 
162 	if (!tcp->tcp_unfusable && !peer_tcp->tcp_unfusable &&
163 	    (tcp->tcp_kssl_ent == NULL) && (tcp->tcp_xmit_head == NULL) &&
164 	    (peer_tcp->tcp_xmit_head == NULL)) {
165 		mblk_t *mp;
166 		queue_t *peer_rq = peer_connp->conn_rq;
167 
168 		ASSERT(!TCP_IS_DETACHED(peer_tcp));
169 		ASSERT(tcp->tcp_fused_sigurg_mp == NULL);
170 		ASSERT(peer_tcp->tcp_fused_sigurg_mp == NULL);
171 		ASSERT(tcp->tcp_kssl_ctx == NULL);
172 
173 		/*
174 		 * We need to drain data on both endpoints during unfuse.
175 		 * If we need to send up SIGURG at the time of draining,
176 		 * we want to be sure that an mblk is readily available.
177 		 * This is why we pre-allocate the M_PCSIG mblks for both
178 		 * endpoints which will only be used during/after unfuse.
179 		 * The mblk might already exist if we are doing a re-fuse.
180 		 */
181 		if (!IPCL_IS_NONSTR(tcp->tcp_connp)) {
182 			ASSERT(!IPCL_IS_NONSTR(peer_tcp->tcp_connp));
183 
184 			if (tcp->tcp_fused_sigurg_mp == NULL) {
185 				if ((mp = allocb(1, BPRI_HI)) == NULL)
186 					goto failed;
187 				tcp->tcp_fused_sigurg_mp = mp;
188 			}
189 
190 			if (peer_tcp->tcp_fused_sigurg_mp == NULL) {
191 				if ((mp = allocb(1, BPRI_HI)) == NULL)
192 					goto failed;
193 				peer_tcp->tcp_fused_sigurg_mp = mp;
194 			}
195 
196 			if ((mp = allocb(sizeof (struct stroptions),
197 			    BPRI_HI)) == NULL)
198 				goto failed;
199 		}
200 
201 		/* Fuse both endpoints */
202 		peer_tcp->tcp_loopback_peer = tcp;
203 		tcp->tcp_loopback_peer = peer_tcp;
204 		peer_tcp->tcp_fused = tcp->tcp_fused = B_TRUE;
205 
206 		/*
207 		 * We never use regular tcp paths in fusion and should
208 		 * therefore clear tcp_unsent on both endpoints.  Having
209 		 * them set to non-zero values means asking for trouble
210 		 * especially after unfuse, where we may end up sending
211 		 * through regular tcp paths which expect xmit_list and
212 		 * friends to be correctly setup.
213 		 */
214 		peer_tcp->tcp_unsent = tcp->tcp_unsent = 0;
215 
216 		tcp_timers_stop(tcp);
217 		tcp_timers_stop(peer_tcp);
218 
219 		/*
220 		 * Set receive buffer and max packet size for the
221 		 * active open tcp.
222 		 * eager's values will be set in tcp_accept_finish.
223 		 */
224 		(void) tcp_rwnd_set(peer_tcp, peer_tcp->tcp_connp->conn_rcvbuf);
225 
226 		/*
227 		 * Set the write offset value to zero since we won't
228 		 * be needing any room for TCP/IP headers.
229 		 */
230 		if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp)) {
231 			struct stroptions *stropt;
232 
233 			DB_TYPE(mp) = M_SETOPTS;
234 			mp->b_wptr += sizeof (*stropt);
235 
236 			stropt = (struct stroptions *)mp->b_rptr;
237 			stropt->so_flags = SO_WROFF;
238 			stropt->so_wroff = 0;
239 
240 			/* Send the options up */
241 			putnext(peer_rq, mp);
242 		} else {
243 			struct sock_proto_props sopp;
244 
245 			/* The peer is a non-STREAMS end point */
246 			ASSERT(IPCL_IS_TCP(peer_connp));
247 
248 			sopp.sopp_flags = SOCKOPT_WROFF;
249 			sopp.sopp_wroff = 0;
250 			(*peer_connp->conn_upcalls->su_set_proto_props)
251 			    (peer_connp->conn_upper_handle, &sopp);
252 		}
253 	} else {
254 		TCP_STAT(tcps, tcp_fusion_unqualified);
255 	}
256 	CONN_DEC_REF(peer_connp);
257 	return;
258 
259 failed:
260 	if (tcp->tcp_fused_sigurg_mp != NULL) {
261 		freeb(tcp->tcp_fused_sigurg_mp);
262 		tcp->tcp_fused_sigurg_mp = NULL;
263 	}
264 	if (peer_tcp->tcp_fused_sigurg_mp != NULL) {
265 		freeb(peer_tcp->tcp_fused_sigurg_mp);
266 		peer_tcp->tcp_fused_sigurg_mp = NULL;
267 	}
268 	CONN_DEC_REF(peer_connp);
269 }
270 
271 /*
272  * Unfuse a previously-fused pair of tcp loopback endpoints.
273  */
274 void
275 tcp_unfuse(tcp_t *tcp)
276 {
277 	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
278 	tcp_stack_t *tcps = tcp->tcp_tcps;
279 
280 	ASSERT(tcp->tcp_fused && peer_tcp != NULL);
281 	ASSERT(peer_tcp->tcp_fused && peer_tcp->tcp_loopback_peer == tcp);
282 	ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp);
283 	ASSERT(tcp->tcp_unsent == 0 && peer_tcp->tcp_unsent == 0);
284 
285 	/*
286 	 * Cancel any pending push timers.
287 	 */
288 	if (tcp->tcp_push_tid != 0) {
289 		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
290 		tcp->tcp_push_tid = 0;
291 	}
292 	if (peer_tcp->tcp_push_tid != 0) {
293 		(void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid);
294 		peer_tcp->tcp_push_tid = 0;
295 	}
296 
297 	/*
298 	 * Drain any pending data; Note that in case of a detached tcp, the
299 	 * draining will happen later after the tcp is unfused.  For non-
300 	 * urgent data, this can be handled by the regular tcp_rcv_drain().
301 	 * If we have urgent data sitting in the receive list, we will
302 	 * need to send up a SIGURG signal first before draining the data.
303 	 * All of these will be handled by the code in tcp_fuse_rcv_drain()
304 	 * when called from tcp_rcv_drain().
305 	 */
306 	if (!TCP_IS_DETACHED(tcp)) {
307 		(void) tcp_fuse_rcv_drain(tcp->tcp_connp->conn_rq, tcp,
308 		    &tcp->tcp_fused_sigurg_mp);
309 	}
310 	if (!TCP_IS_DETACHED(peer_tcp)) {
311 		(void) tcp_fuse_rcv_drain(peer_tcp->tcp_connp->conn_rq,
312 		    peer_tcp,  &peer_tcp->tcp_fused_sigurg_mp);
313 	}
314 
315 	/* Lift up any flow-control conditions */
316 	mutex_enter(&tcp->tcp_non_sq_lock);
317 	if (tcp->tcp_flow_stopped) {
318 		tcp_clrqfull(tcp);
319 		TCP_STAT(tcps, tcp_fusion_backenabled);
320 	}
321 	mutex_exit(&tcp->tcp_non_sq_lock);
322 
323 	mutex_enter(&peer_tcp->tcp_non_sq_lock);
324 	if (peer_tcp->tcp_flow_stopped) {
325 		tcp_clrqfull(peer_tcp);
326 		TCP_STAT(tcps, tcp_fusion_backenabled);
327 	}
328 	mutex_exit(&peer_tcp->tcp_non_sq_lock);
329 
330 	/*
331 	 * Update tha_seq and tha_ack in the header template
332 	 */
333 	tcp->tcp_tcpha->tha_seq = htonl(tcp->tcp_snxt);
334 	tcp->tcp_tcpha->tha_ack = htonl(tcp->tcp_rnxt);
335 	peer_tcp->tcp_tcpha->tha_seq = htonl(peer_tcp->tcp_snxt);
336 	peer_tcp->tcp_tcpha->tha_ack = htonl(peer_tcp->tcp_rnxt);
337 
338 	/* Unfuse the endpoints */
339 	peer_tcp->tcp_fused = tcp->tcp_fused = B_FALSE;
340 	peer_tcp->tcp_loopback_peer = tcp->tcp_loopback_peer = NULL;
341 }
342 
343 /*
344  * Fusion output routine used to handle urgent data sent by STREAMS based
345  * endpoints. This routine is called by tcp_fuse_output() for handling
346  * non-M_DATA mblks.
347  */
348 void
349 tcp_fuse_output_urg(tcp_t *tcp, mblk_t *mp)
350 {
351 	mblk_t *mp1;
352 	struct T_exdata_ind *tei;
353 	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
354 	mblk_t *head, *prev_head = NULL;
355 	tcp_stack_t	*tcps = tcp->tcp_tcps;
356 
357 	ASSERT(tcp->tcp_fused);
358 	ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
359 	ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
360 	ASSERT(DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO);
361 	ASSERT(mp->b_cont != NULL && DB_TYPE(mp->b_cont) == M_DATA);
362 	ASSERT(MBLKL(mp) >= sizeof (*tei) && MBLKL(mp->b_cont) > 0);
363 
364 	/*
365 	 * Urgent data arrives in the form of T_EXDATA_REQ from above.
366 	 * Each occurence denotes a new urgent pointer.  For each new
367 	 * urgent pointer we signal (SIGURG) the receiving app to indicate
368 	 * that it needs to go into urgent mode.  This is similar to the
369 	 * urgent data handling in the regular tcp.  We don't need to keep
370 	 * track of where the urgent pointer is, because each T_EXDATA_REQ
371 	 * "advances" the urgent pointer for us.
372 	 *
373 	 * The actual urgent data carried by T_EXDATA_REQ is then prepended
374 	 * by a T_EXDATA_IND before being enqueued behind any existing data
375 	 * destined for the receiving app.  There is only a single urgent
376 	 * pointer (out-of-band mark) for a given tcp.  If the new urgent
377 	 * data arrives before the receiving app reads some existing urgent
378 	 * data, the previous marker is lost.  This behavior is emulated
379 	 * accordingly below, by removing any existing T_EXDATA_IND messages
380 	 * and essentially converting old urgent data into non-urgent.
381 	 */
382 	ASSERT(tcp->tcp_valid_bits & TCP_URG_VALID);
383 	/* Let sender get out of urgent mode */
384 	tcp->tcp_valid_bits &= ~TCP_URG_VALID;
385 
386 	/*
387 	 * This flag indicates that a signal needs to be sent up.
388 	 * This flag will only get cleared once SIGURG is delivered and
389 	 * is not affected by the tcp_fused flag -- delivery will still
390 	 * happen even after an endpoint is unfused, to handle the case
391 	 * where the sending endpoint immediately closes/unfuses after
392 	 * sending urgent data and the accept is not yet finished.
393 	 */
394 	peer_tcp->tcp_fused_sigurg = B_TRUE;
395 
396 	/* Reuse T_EXDATA_REQ mblk for T_EXDATA_IND */
397 	DB_TYPE(mp) = M_PROTO;
398 	tei = (struct T_exdata_ind *)mp->b_rptr;
399 	tei->PRIM_type = T_EXDATA_IND;
400 	tei->MORE_flag = 0;
401 	mp->b_wptr = (uchar_t *)&tei[1];
402 
403 	TCP_STAT(tcps, tcp_fusion_urg);
404 	TCPS_BUMP_MIB(tcps, tcpOutUrg);
405 
406 	head = peer_tcp->tcp_rcv_list;
407 	while (head != NULL) {
408 		/*
409 		 * Remove existing T_EXDATA_IND, keep the data which follows
410 		 * it and relink our list.  Note that we don't modify the
411 		 * tcp_rcv_last_tail since it never points to T_EXDATA_IND.
412 		 */
413 		if (DB_TYPE(head) != M_DATA) {
414 			mp1 = head;
415 
416 			ASSERT(DB_TYPE(mp1->b_cont) == M_DATA);
417 			head = mp1->b_cont;
418 			mp1->b_cont = NULL;
419 			head->b_next = mp1->b_next;
420 			mp1->b_next = NULL;
421 			if (prev_head != NULL)
422 				prev_head->b_next = head;
423 			if (peer_tcp->tcp_rcv_list == mp1)
424 				peer_tcp->tcp_rcv_list = head;
425 			if (peer_tcp->tcp_rcv_last_head == mp1)
426 				peer_tcp->tcp_rcv_last_head = head;
427 			freeb(mp1);
428 		}
429 		prev_head = head;
430 		head = head->b_next;
431 	}
432 }
433 
434 /*
435  * Fusion output routine, called by tcp_output() and tcp_wput_proto().
436  * If we are modifying any member that can be changed outside the squeue,
437  * like tcp_flow_stopped, we need to take tcp_non_sq_lock.
438  */
439 boolean_t
440 tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
441 {
442 	conn_t		*connp = tcp->tcp_connp;
443 	tcp_t		*peer_tcp = tcp->tcp_loopback_peer;
444 	conn_t		*peer_connp = peer_tcp->tcp_connp;
445 	boolean_t	flow_stopped, peer_data_queued = B_FALSE;
446 	boolean_t	urgent = (DB_TYPE(mp) != M_DATA);
447 	boolean_t	push = B_TRUE;
448 	mblk_t		*mp1 = mp;
449 	uint_t		ip_hdr_len;
450 	uint32_t	recv_size = send_size;
451 	tcp_stack_t	*tcps = tcp->tcp_tcps;
452 	netstack_t	*ns = tcps->tcps_netstack;
453 	ip_stack_t	*ipst = ns->netstack_ip;
454 	ipsec_stack_t	*ipss = ns->netstack_ipsec;
455 	iaflags_t	ixaflags = connp->conn_ixa->ixa_flags;
456 	boolean_t	do_ipsec, hooks_out, hooks_in, ipobs_enabled;
457 
458 	ASSERT(tcp->tcp_fused);
459 	ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
460 	ASSERT(connp->conn_sqp == peer_connp->conn_sqp);
461 	ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO ||
462 	    DB_TYPE(mp) == M_PCPROTO);
463 
464 	if (send_size == 0) {
465 		freemsg(mp);
466 		return (B_TRUE);
467 	}
468 
469 	/*
470 	 * Handle urgent data; we either send up SIGURG to the peer now
471 	 * or do it later when we drain, in case the peer is detached
472 	 * or if we're short of memory for M_PCSIG mblk.
473 	 */
474 	if (urgent) {
475 		tcp_fuse_output_urg(tcp, mp);
476 
477 		mp1 = mp->b_cont;
478 	}
479 
480 	/*
481 	 * Check that we are still using an IRE_LOCAL or IRE_LOOPBACK before
482 	 * further processes.
483 	 */
484 	if (!ip_output_verify_local(connp->conn_ixa))
485 		goto unfuse;
486 
487 	/*
488 	 * Build IP and TCP header in case we have something that needs the
489 	 * headers. Those cases are:
490 	 * 1. IPsec
491 	 * 2. IPobs
492 	 * 3. FW_HOOKS
493 	 *
494 	 * If tcp_xmit_mp() fails to dupb() the message, unfuse the connection
495 	 * and back to regular path.
496 	 */
497 	if (ixaflags & IXAF_IS_IPV4) {
498 		do_ipsec = (ixaflags & IXAF_IPSEC_SECURE) ||
499 		    CONN_INBOUND_POLICY_PRESENT(peer_connp, ipss);
500 
501 		hooks_out = HOOKS4_INTERESTED_LOOPBACK_OUT(ipst);
502 		hooks_in = HOOKS4_INTERESTED_LOOPBACK_IN(ipst);
503 		ipobs_enabled = (ipst->ips_ip4_observe.he_interested != 0);
504 	} else {
505 		do_ipsec = (ixaflags & IXAF_IPSEC_SECURE) ||
506 		    CONN_INBOUND_POLICY_PRESENT_V6(peer_connp, ipss);
507 
508 		hooks_out = HOOKS6_INTERESTED_LOOPBACK_OUT(ipst);
509 		hooks_in = HOOKS6_INTERESTED_LOOPBACK_IN(ipst);
510 		ipobs_enabled = (ipst->ips_ip6_observe.he_interested != 0);
511 	}
512 
513 	/* We do logical 'or' for efficiency */
514 	if (ipobs_enabled | do_ipsec | hooks_in | hooks_out) {
515 		if ((mp1 = tcp_xmit_mp(tcp, mp1, tcp->tcp_mss, NULL, NULL,
516 		    tcp->tcp_snxt, B_TRUE, NULL, B_FALSE)) == NULL)
517 			/* If tcp_xmit_mp fails, use regular path */
518 			goto unfuse;
519 
520 		/*
521 		 * Leave all IP relevant processes to ip_output_process_local(),
522 		 * which handles IPsec, IPobs, and FW_HOOKS.
523 		 */
524 		mp1 = ip_output_process_local(mp1, connp->conn_ixa, hooks_out,
525 		    hooks_in, do_ipsec ? peer_connp : NULL);
526 
527 		/* If the message is dropped for any reason. */
528 		if (mp1 == NULL)
529 			goto unfuse;
530 
531 		/*
532 		 * Data length might have been changed by FW_HOOKS.
533 		 * We assume that the first mblk contains the TCP/IP headers.
534 		 */
535 		if (hooks_in || hooks_out) {
536 			tcpha_t *tcpha;
537 
538 			ip_hdr_len = (ixaflags & IXAF_IS_IPV4) ?
539 			    IPH_HDR_LENGTH((ipha_t *)mp1->b_rptr) :
540 			    ip_hdr_length_v6(mp1, (ip6_t *)mp1->b_rptr);
541 
542 			tcpha = (tcpha_t *)&mp1->b_rptr[ip_hdr_len];
543 			ASSERT((uchar_t *)tcpha + sizeof (tcpha_t) <=
544 			    mp1->b_wptr);
545 			recv_size += htonl(tcpha->tha_seq) - tcp->tcp_snxt;
546 
547 		}
548 
549 		/*
550 		 * The message duplicated by tcp_xmit_mp is freed.
551 		 * Note: the original message passed in remains unchanged.
552 		 */
553 		freemsg(mp1);
554 	}
555 
556 	/*
557 	 * Enqueue data into the peer's receive list; we may or may not
558 	 * drain the contents depending on the conditions below.
559 	 *
560 	 * For non-STREAMS sockets we normally queue data directly in the
561 	 * socket by calling the su_recv upcall. However, if the peer is
562 	 * detached we use tcp_rcv_enqueue() instead. Queued data will be
563 	 * drained when the accept completes (in tcp_accept_finish()).
564 	 */
565 	if (IPCL_IS_NONSTR(peer_connp) &&
566 	    !TCP_IS_DETACHED(peer_tcp)) {
567 		int error;
568 		int flags = 0;
569 
570 		if ((tcp->tcp_valid_bits & TCP_URG_VALID) &&
571 		    (tcp->tcp_urg == tcp->tcp_snxt)) {
572 			flags = MSG_OOB;
573 			(*peer_connp->conn_upcalls->su_signal_oob)
574 			    (peer_connp->conn_upper_handle, 0);
575 			tcp->tcp_valid_bits &= ~TCP_URG_VALID;
576 		}
577 		if ((*peer_connp->conn_upcalls->su_recv)(
578 		    peer_connp->conn_upper_handle, mp, recv_size,
579 		    flags, &error, &push) < 0) {
580 			ASSERT(error != EOPNOTSUPP);
581 			peer_data_queued = B_TRUE;
582 		}
583 	} else {
584 		if (IPCL_IS_NONSTR(peer_connp) &&
585 		    (tcp->tcp_valid_bits & TCP_URG_VALID) &&
586 		    (tcp->tcp_urg == tcp->tcp_snxt)) {
587 			/*
588 			 * Can not deal with urgent pointers
589 			 * that arrive before the connection has been
590 			 * accept()ed.
591 			 */
592 			tcp->tcp_valid_bits &= ~TCP_URG_VALID;
593 			freemsg(mp);
594 			return (B_TRUE);
595 		}
596 
597 		tcp_rcv_enqueue(peer_tcp, mp, recv_size,
598 		    tcp->tcp_connp->conn_cred);
599 
600 		/* In case it wrapped around and also to keep it constant */
601 		peer_tcp->tcp_rwnd += recv_size;
602 	}
603 
604 	/*
605 	 * Exercise flow-control when needed; we will get back-enabled
606 	 * in either tcp_accept_finish(), tcp_unfuse(), or when data is
607 	 * consumed. If peer endpoint is detached, we emulate streams flow
608 	 * control by checking the peer's queue size and high water mark;
609 	 * otherwise we simply use canputnext() to decide if we need to stop
610 	 * our flow.
611 	 *
612 	 * Since we are accessing our tcp_flow_stopped and might modify it,
613 	 * we need to take tcp->tcp_non_sq_lock.
614 	 */
615 	mutex_enter(&tcp->tcp_non_sq_lock);
616 	flow_stopped = tcp->tcp_flow_stopped;
617 	if ((TCP_IS_DETACHED(peer_tcp) &&
618 	    (peer_tcp->tcp_rcv_cnt >= peer_connp->conn_rcvbuf)) ||
619 	    (!TCP_IS_DETACHED(peer_tcp) &&
620 	    !IPCL_IS_NONSTR(peer_connp) && !canputnext(peer_connp->conn_rq))) {
621 		peer_data_queued = B_TRUE;
622 	}
623 
624 	if (!flow_stopped && (peer_data_queued ||
625 	    (TCP_UNSENT_BYTES(tcp) >= connp->conn_sndbuf))) {
626 		tcp_setqfull(tcp);
627 		flow_stopped = B_TRUE;
628 		TCP_STAT(tcps, tcp_fusion_flowctl);
629 		DTRACE_PROBE3(tcp__fuse__output__flowctl, tcp_t *, tcp,
630 		    uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt);
631 	} else if (flow_stopped && !peer_data_queued &&
632 	    (TCP_UNSENT_BYTES(tcp) <= connp->conn_sndlowat)) {
633 		tcp_clrqfull(tcp);
634 		TCP_STAT(tcps, tcp_fusion_backenabled);
635 		flow_stopped = B_FALSE;
636 	}
637 	mutex_exit(&tcp->tcp_non_sq_lock);
638 
639 	ipst->ips_loopback_packets++;
640 	tcp->tcp_last_sent_len = send_size;
641 
642 	/* Need to adjust the following SNMP MIB-related variables */
643 	tcp->tcp_snxt += send_size;
644 	tcp->tcp_suna = tcp->tcp_snxt;
645 	peer_tcp->tcp_rnxt += recv_size;
646 	peer_tcp->tcp_rack = peer_tcp->tcp_rnxt;
647 
648 	TCPS_BUMP_MIB(tcps, tcpOutDataSegs);
649 	TCPS_UPDATE_MIB(tcps, tcpOutDataBytes, send_size);
650 
651 	TCPS_BUMP_MIB(tcps, tcpHCInSegs);
652 	TCPS_BUMP_MIB(tcps, tcpInDataInorderSegs);
653 	TCPS_UPDATE_MIB(tcps, tcpInDataInorderBytes, send_size);
654 
655 	BUMP_LOCAL(tcp->tcp_obsegs);
656 	BUMP_LOCAL(peer_tcp->tcp_ibsegs);
657 
658 	DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size);
659 
660 	if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp) &&
661 	    !TCP_IS_DETACHED(peer_tcp)) {
662 		/*
663 		 * Drain the peer's receive queue it has urgent data or if
664 		 * we're not flow-controlled.
665 		 */
666 		if (urgent || !flow_stopped) {
667 			ASSERT(peer_tcp->tcp_rcv_list != NULL);
668 			/*
669 			 * For TLI-based streams, a thread in tcp_accept_swap()
670 			 * can race with us.  That thread will ensure that the
671 			 * correct peer_connp->conn_rq is globally visible
672 			 * before peer_tcp->tcp_detached is visible as clear,
673 			 * but we must also ensure that the load of conn_rq
674 			 * cannot be reordered to be before the tcp_detached
675 			 * check.
676 			 */
677 			membar_consumer();
678 			(void) tcp_fuse_rcv_drain(peer_connp->conn_rq, peer_tcp,
679 			    NULL);
680 		}
681 	}
682 	return (B_TRUE);
683 unfuse:
684 	tcp_unfuse(tcp);
685 	return (B_FALSE);
686 }
687 
688 /*
689  * This routine gets called to deliver data upstream on a fused or
690  * previously fused tcp loopback endpoint; the latter happens only
691  * when there is a pending SIGURG signal plus urgent data that can't
692  * be sent upstream in the past.
693  */
694 boolean_t
695 tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp)
696 {
697 	mblk_t *mp;
698 	conn_t	*connp = tcp->tcp_connp;
699 
700 #ifdef DEBUG
701 	uint_t cnt = 0;
702 #endif
703 	tcp_stack_t	*tcps = tcp->tcp_tcps;
704 	tcp_t		*peer_tcp = tcp->tcp_loopback_peer;
705 
706 	ASSERT(tcp->tcp_loopback);
707 	ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg);
708 	ASSERT(!tcp->tcp_fused || tcp->tcp_loopback_peer != NULL);
709 	ASSERT(IPCL_IS_NONSTR(connp) || sigurg_mpp != NULL || tcp->tcp_fused);
710 
711 	/* No need for the push timer now, in case it was scheduled */
712 	if (tcp->tcp_push_tid != 0) {
713 		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
714 		tcp->tcp_push_tid = 0;
715 	}
716 	/*
717 	 * If there's urgent data sitting in receive list and we didn't
718 	 * get a chance to send up a SIGURG signal, make sure we send
719 	 * it first before draining in order to ensure that SIOCATMARK
720 	 * works properly.
721 	 */
722 	if (tcp->tcp_fused_sigurg) {
723 		ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
724 
725 		tcp->tcp_fused_sigurg = B_FALSE;
726 		/*
727 		 * sigurg_mpp is normally NULL, i.e. when we're still
728 		 * fused and didn't get here because of tcp_unfuse().
729 		 * In this case try hard to allocate the M_PCSIG mblk.
730 		 */
731 		if (sigurg_mpp == NULL &&
732 		    (mp = allocb(1, BPRI_HI)) == NULL &&
733 		    (mp = allocb_tryhard(1)) == NULL) {
734 			/* Alloc failed; try again next time */
735 			tcp->tcp_push_tid = TCP_TIMER(tcp,
736 			    tcp_push_timer,
737 			    MSEC_TO_TICK(
738 			    tcps->tcps_push_timer_interval));
739 			return (B_TRUE);
740 		} else if (sigurg_mpp != NULL) {
741 			/*
742 			 * Use the supplied M_PCSIG mblk; it means we're
743 			 * either unfused or in the process of unfusing,
744 			 * and the drain must happen now.
745 			 */
746 			mp = *sigurg_mpp;
747 			*sigurg_mpp = NULL;
748 		}
749 		ASSERT(mp != NULL);
750 
751 		/* Send up the signal */
752 		DB_TYPE(mp) = M_PCSIG;
753 		*mp->b_wptr++ = (uchar_t)SIGURG;
754 		putnext(q, mp);
755 
756 		/*
757 		 * Let the regular tcp_rcv_drain() path handle
758 		 * draining the data if we're no longer fused.
759 		 */
760 		if (!tcp->tcp_fused)
761 			return (B_FALSE);
762 	}
763 
764 	/* Drain the data */
765 	while ((mp = tcp->tcp_rcv_list) != NULL) {
766 		tcp->tcp_rcv_list = mp->b_next;
767 		mp->b_next = NULL;
768 #ifdef DEBUG
769 		cnt += msgdsize(mp);
770 #endif
771 		ASSERT(!IPCL_IS_NONSTR(connp));
772 		putnext(q, mp);
773 		TCP_STAT(tcps, tcp_fusion_putnext);
774 	}
775 
776 #ifdef DEBUG
777 	ASSERT(cnt == tcp->tcp_rcv_cnt);
778 #endif
779 	tcp->tcp_rcv_last_head = NULL;
780 	tcp->tcp_rcv_last_tail = NULL;
781 	tcp->tcp_rcv_cnt = 0;
782 	tcp->tcp_rwnd = tcp->tcp_connp->conn_rcvbuf;
783 
784 	mutex_enter(&peer_tcp->tcp_non_sq_lock);
785 	if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <=
786 	    peer_tcp->tcp_connp->conn_sndlowat)) {
787 		tcp_clrqfull(peer_tcp);
788 		TCP_STAT(tcps, tcp_fusion_backenabled);
789 	}
790 	mutex_exit(&peer_tcp->tcp_non_sq_lock);
791 
792 	return (B_TRUE);
793 }
794 
795 /*
796  * Calculate the size of receive buffer for a fused tcp endpoint.
797  */
798 size_t
799 tcp_fuse_set_rcv_hiwat(tcp_t *tcp, size_t rwnd)
800 {
801 	tcp_stack_t	*tcps = tcp->tcp_tcps;
802 	uint32_t	max_win;
803 
804 	ASSERT(tcp->tcp_fused);
805 
806 	/* Ensure that value is within the maximum upper bound */
807 	if (rwnd > tcps->tcps_max_buf)
808 		rwnd = tcps->tcps_max_buf;
809 	/*
810 	 * Round up to system page size in case SO_RCVBUF is modified
811 	 * after SO_SNDBUF; the latter is also similarly rounded up.
812 	 */
813 	rwnd = P2ROUNDUP_TYPED(rwnd, PAGESIZE, size_t);
814 	max_win = TCP_MAXWIN << tcp->tcp_rcv_ws;
815 	if (rwnd > max_win) {
816 		rwnd = max_win - (max_win % tcp->tcp_mss);
817 		if (rwnd < tcp->tcp_mss)
818 			rwnd = max_win;
819 	}
820 
821 	/*
822 	 * Record high water mark, this is used for flow-control
823 	 * purposes in tcp_fuse_output().
824 	 */
825 	tcp->tcp_connp->conn_rcvbuf = rwnd;
826 	tcp->tcp_rwnd = rwnd;
827 	return (rwnd);
828 }
829 
830 /*
831  * Calculate the maximum outstanding unread data block for a fused tcp endpoint.
832  */
833 int
834 tcp_fuse_maxpsz(tcp_t *tcp)
835 {
836 	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
837 	conn_t *connp = tcp->tcp_connp;
838 	uint_t sndbuf = connp->conn_sndbuf;
839 	uint_t maxpsz = sndbuf;
840 
841 	ASSERT(tcp->tcp_fused);
842 	ASSERT(peer_tcp != NULL);
843 	ASSERT(peer_tcp->tcp_connp->conn_rcvbuf != 0);
844 	/*
845 	 * In the fused loopback case, we want the stream head to split
846 	 * up larger writes into smaller chunks for a more accurate flow-
847 	 * control accounting.  Our maxpsz is half of the sender's send
848 	 * buffer or the receiver's receive buffer, whichever is smaller.
849 	 * We round up the buffer to system page size due to the lack of
850 	 * TCP MSS concept in Fusion.
851 	 */
852 	if (maxpsz > peer_tcp->tcp_connp->conn_rcvbuf)
853 		maxpsz = peer_tcp->tcp_connp->conn_rcvbuf;
854 	maxpsz = P2ROUNDUP_TYPED(maxpsz, PAGESIZE, uint_t) >> 1;
855 
856 	return (maxpsz);
857 }
858 
859 /*
860  * Called to release flow control.
861  */
862 void
863 tcp_fuse_backenable(tcp_t *tcp)
864 {
865 	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
866 
867 	ASSERT(tcp->tcp_fused);
868 	ASSERT(peer_tcp != NULL && peer_tcp->tcp_fused);
869 	ASSERT(peer_tcp->tcp_loopback_peer == tcp);
870 	ASSERT(!TCP_IS_DETACHED(tcp));
871 	ASSERT(tcp->tcp_connp->conn_sqp ==
872 	    peer_tcp->tcp_connp->conn_sqp);
873 
874 	if (tcp->tcp_rcv_list != NULL)
875 		(void) tcp_fuse_rcv_drain(tcp->tcp_connp->conn_rq, tcp, NULL);
876 
877 	mutex_enter(&peer_tcp->tcp_non_sq_lock);
878 	if (peer_tcp->tcp_flow_stopped &&
879 	    (TCP_UNSENT_BYTES(peer_tcp) <=
880 	    peer_tcp->tcp_connp->conn_sndlowat)) {
881 		tcp_clrqfull(peer_tcp);
882 	}
883 	mutex_exit(&peer_tcp->tcp_non_sq_lock);
884 
885 	TCP_STAT(tcp->tcp_tcps, tcp_fusion_backenabled);
886 }
887