xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision 1b8adde7ba7d5e04395c141c5400dc2cffd7d809)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #include <sys/stream.h>
76 #include <sys/ib/clients/rds/rdsib_cm.h>
77 #include <sys/ib/clients/rds/rdsib_ib.h>
78 #include <sys/ib/clients/rds/rdsib_buf.h>
79 #include <sys/ib/clients/rds/rdsib_ep.h>
80 #include <sys/ib/clients/rds/rds_kstat.h>
81 #include <sys/zone.h>
82 
83 #define	RDS_POLL_CQ_IN_2TICKS	1
84 
85 /*
86  * This File contains the endpoint related calls
87  */
88 
89 extern boolean_t rds_islocal(ipaddr_t addr);
90 extern uint_t rds_wc_signal;
91 
92 #define	RDS_LOOPBACK	0
93 #define	RDS_LOCAL	1
94 #define	RDS_REMOTE	2
95 
96 #define	IBT_IPADDR	1
97 
98 static uint8_t
99 rds_is_port_marked(rds_session_t *sp, in_port_t port, uint_t qualifier)
100 {
101 	uint8_t	ret;
102 
103 	switch (qualifier) {
104 	case RDS_LOOPBACK: /* loopback */
105 		rw_enter(&rds_loopback_portmap_lock, RW_READER);
106 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
107 		rw_exit(&rds_loopback_portmap_lock);
108 		break;
109 
110 	case RDS_LOCAL: /* Session local */
111 		ASSERT(sp != NULL);
112 		rw_enter(&sp->session_local_portmap_lock, RW_READER);
113 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
114 		rw_exit(&sp->session_local_portmap_lock);
115 		break;
116 
117 	case RDS_REMOTE: /* Session remote */
118 		ASSERT(sp != NULL);
119 		rw_enter(&sp->session_remote_portmap_lock, RW_READER);
120 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
121 		rw_exit(&sp->session_remote_portmap_lock);
122 		break;
123 	}
124 
125 	return (ret);
126 }
127 
128 static uint8_t
129 rds_check_n_mark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
130 {
131 	uint8_t	ret;
132 
133 	switch (qualifier) {
134 	case RDS_LOOPBACK: /* loopback */
135 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
136 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
137 		if (!ret) {
138 			/* port is not marked, mark it */
139 			rds_loopback_portmap[port/8] =
140 			    rds_loopback_portmap[port/8] | (1 << (port % 8));
141 		}
142 		rw_exit(&rds_loopback_portmap_lock);
143 		break;
144 
145 	case RDS_LOCAL: /* Session local */
146 		ASSERT(sp != NULL);
147 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
148 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
149 		if (!ret) {
150 			/* port is not marked, mark it */
151 			sp->session_local_portmap[port/8] =
152 			    sp->session_local_portmap[port/8] |
153 			    (1 << (port % 8));
154 		}
155 		rw_exit(&sp->session_local_portmap_lock);
156 		break;
157 
158 	case RDS_REMOTE: /* Session remote */
159 		ASSERT(sp != NULL);
160 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
161 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
162 		if (!ret) {
163 			/* port is not marked, mark it */
164 			sp->session_remote_portmap[port/8] =
165 			    sp->session_remote_portmap[port/8] |
166 			    (1 << (port % 8));
167 		}
168 		rw_exit(&sp->session_remote_portmap_lock);
169 		break;
170 	}
171 
172 	return (ret);
173 }
174 
175 static uint8_t
176 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
177 {
178 	uint8_t	ret;
179 
180 	switch (qualifier) {
181 	case RDS_LOOPBACK: /* loopback */
182 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
183 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
184 		if (ret) {
185 			/* port is marked, unmark it */
186 			rds_loopback_portmap[port/8] =
187 			    rds_loopback_portmap[port/8] & ~(1 << (port % 8));
188 		}
189 		rw_exit(&rds_loopback_portmap_lock);
190 		break;
191 
192 	case RDS_LOCAL: /* Session local */
193 		ASSERT(sp != NULL);
194 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
195 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
196 		if (ret) {
197 			/* port is marked, unmark it */
198 			sp->session_local_portmap[port/8] =
199 			    sp->session_local_portmap[port/8] &
200 			    ~(1 << (port % 8));
201 		}
202 		rw_exit(&sp->session_local_portmap_lock);
203 		break;
204 
205 	case RDS_REMOTE: /* Session remote */
206 		ASSERT(sp != NULL);
207 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
208 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
209 		if (ret) {
210 			/* port is marked, unmark it */
211 			sp->session_remote_portmap[port/8] =
212 			    sp->session_remote_portmap[port/8] &
213 			    ~(1 << (port % 8));
214 		}
215 		rw_exit(&sp->session_remote_portmap_lock);
216 		break;
217 	}
218 
219 	return (ret);
220 }
221 
222 static void
223 rds_mark_all_ports(rds_session_t *sp, uint_t qualifier)
224 {
225 	switch (qualifier) {
226 	case RDS_LOOPBACK: /* loopback */
227 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
228 		(void) memset(rds_loopback_portmap, 0xFF, RDS_PORT_MAP_SIZE);
229 		rw_exit(&rds_loopback_portmap_lock);
230 		break;
231 
232 	case RDS_LOCAL: /* Session local */
233 		ASSERT(sp != NULL);
234 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
235 		(void) memset(sp->session_local_portmap, 0xFF,
236 		    RDS_PORT_MAP_SIZE);
237 		rw_exit(&sp->session_local_portmap_lock);
238 		break;
239 
240 	case RDS_REMOTE: /* Session remote */
241 		ASSERT(sp != NULL);
242 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
243 		(void) memset(sp->session_remote_portmap, 0xFF,
244 		    RDS_PORT_MAP_SIZE);
245 		rw_exit(&sp->session_remote_portmap_lock);
246 		break;
247 	}
248 }
249 
250 static void
251 rds_unmark_all_ports(rds_session_t *sp, uint_t qualifier)
252 {
253 	switch (qualifier) {
254 	case RDS_LOOPBACK: /* loopback */
255 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
256 		bzero(rds_loopback_portmap, RDS_PORT_MAP_SIZE);
257 		rw_exit(&rds_loopback_portmap_lock);
258 		break;
259 
260 	case RDS_LOCAL: /* Session local */
261 		ASSERT(sp != NULL);
262 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
263 		bzero(sp->session_local_portmap, RDS_PORT_MAP_SIZE);
264 		rw_exit(&sp->session_local_portmap_lock);
265 		break;
266 
267 	case RDS_REMOTE: /* Session remote */
268 		ASSERT(sp != NULL);
269 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
270 		bzero(sp->session_remote_portmap, RDS_PORT_MAP_SIZE);
271 		rw_exit(&sp->session_remote_portmap_lock);
272 		break;
273 	}
274 }
275 
276 static boolean_t
277 rds_add_session(rds_session_t *sp, boolean_t locked)
278 {
279 	boolean_t retval = B_TRUE;
280 
281 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
282 
283 	if (!locked) {
284 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
285 	}
286 
287 	/* Don't allow more sessions than configured in rdsib.conf */
288 	if (rdsib_statep->rds_nsessions >= (MaxNodes - 1)) {
289 		RDS_DPRINTF1("rds_add_session", "Max session limit reached");
290 		retval = B_FALSE;
291 	} else {
292 		sp->session_nextp = rdsib_statep->rds_sessionlistp;
293 		rdsib_statep->rds_sessionlistp = sp;
294 		rdsib_statep->rds_nsessions++;
295 		RDS_INCR_SESS();
296 	}
297 
298 	if (!locked) {
299 		rw_exit(&rdsib_statep->rds_sessionlock);
300 	}
301 
302 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
303 
304 	return (retval);
305 }
306 
307 /* Session lookup based on destination IP or destination node guid */
308 rds_session_t *
309 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
310 {
311 	rds_session_t	*sp;
312 
313 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
314 	    remoteip, node_guid);
315 
316 	/* A read/write lock is expected, will panic if none of them are held */
317 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
318 	sp = statep->rds_sessionlistp;
319 	while (sp) {
320 		if ((sp->session_remip == remoteip) || ((node_guid != 0) &&
321 		    (sp->session_rgid.gid_guid == node_guid))) {
322 			break;
323 		}
324 
325 		sp = sp->session_nextp;
326 	}
327 
328 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
329 
330 	return (sp);
331 }
332 
333 boolean_t
334 rds_session_lkup_by_sp(rds_session_t *sp)
335 {
336 	rds_session_t *sessionp;
337 
338 	RDS_DPRINTF4("rds_session_lkup_by_sp", "Enter: 0x%p", sp);
339 
340 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
341 	sessionp = rdsib_statep->rds_sessionlistp;
342 	while (sessionp) {
343 		if (sessionp == sp) {
344 			rw_exit(&rdsib_statep->rds_sessionlock);
345 			return (B_TRUE);
346 		}
347 
348 		sessionp = sessionp->session_nextp;
349 	}
350 	rw_exit(&rdsib_statep->rds_sessionlock);
351 
352 	return (B_FALSE);
353 }
354 
355 static void
356 rds_ep_fini(rds_ep_t *ep)
357 {
358 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
359 
360 	/* free send pool */
361 	rds_free_send_pool(ep);
362 
363 	/* free recv pool */
364 	rds_free_recv_pool(ep);
365 
366 	mutex_enter(&ep->ep_lock);
367 	ep->ep_hca_guid = 0;
368 	mutex_exit(&ep->ep_lock);
369 
370 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
371 }
372 
373 /* Assumes SP write lock is held */
374 int
375 rds_ep_init(rds_ep_t *ep, ib_guid_t hca_guid)
376 {
377 	uint_t		ret;
378 
379 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
380 
381 	/* send pool */
382 	ret = rds_init_send_pool(ep, hca_guid);
383 	if (ret != 0) {
384 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
385 		    ep, ret);
386 		return (-1);
387 	}
388 
389 	/* recv pool */
390 	ret = rds_init_recv_pool(ep);
391 	if (ret != 0) {
392 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
393 		    ep, ret);
394 		rds_free_send_pool(ep);
395 		return (-1);
396 	}
397 
398 	/* reset the ep state */
399 	mutex_enter(&ep->ep_lock);
400 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
401 	ep->ep_hca_guid = hca_guid;
402 	ep->ep_lbufid = NULL;
403 	ep->ep_rbufid = NULL;
404 	ep->ep_segfbp = NULL;
405 	ep->ep_seglbp = NULL;
406 
407 	/* Initialize the WR to send acknowledgements */
408 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
409 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
410 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
411 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
412 	ep->ep_ackwr.wr_nds = 1;
413 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
414 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
415 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
416 	mutex_exit(&ep->ep_lock);
417 
418 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
419 
420 	return (0);
421 }
422 
423 static int
424 rds_ep_reinit(rds_ep_t *ep, ib_guid_t hca_guid)
425 {
426 	int	ret;
427 
428 	RDS_DPRINTF3("rds_ep_reinit", "Enter: EP(%p) Type: %d",
429 	    ep, ep->ep_type);
430 
431 	/* Re-initialize send pool */
432 	ret = rds_reinit_send_pool(ep, hca_guid);
433 	if (ret != 0) {
434 		RDS_DPRINTF2("rds_ep_reinit",
435 		    "EP(%p): rds_reinit_send_pool failed: %d", ep, ret);
436 		return (-1);
437 	}
438 
439 	/* free all the receive buffers in the pool */
440 	rds_free_recv_pool(ep);
441 
442 	RDS_DPRINTF3("rds_ep_reinit", "Return: EP(%p) Type: %d",
443 	    ep, ep->ep_type);
444 
445 	return (0);
446 }
447 
448 void
449 rds_session_fini(rds_session_t *sp)
450 {
451 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
452 
453 	rds_ep_fini(&sp->session_dataep);
454 	rds_ep_fini(&sp->session_ctrlep);
455 
456 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
457 }
458 
459 /*
460  * Allocate and initialize the resources needed for the control and
461  * data channels
462  */
463 int
464 rds_session_init(rds_session_t *sp)
465 {
466 	int		ret;
467 	rds_hca_t	*hcap;
468 	ib_guid_t	hca_guid;
469 
470 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
471 
472 	/* CALLED WITH SESSION WRITE LOCK */
473 
474 	hcap = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
475 	if (hcap == NULL) {
476 		RDS_DPRINTF2("rds_session_init", "SGID is on an uninitialized "
477 		    "HCA: %llx", sp->session_lgid.gid_guid);
478 		return (-1);
479 	}
480 
481 	hca_guid = hcap->hca_guid;
482 
483 	/* allocate and initialize the ctrl channel */
484 	ret = rds_ep_init(&sp->session_ctrlep, hca_guid);
485 	if (ret != 0) {
486 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
487 		    "failed", sp, &sp->session_ctrlep);
488 		return (-1);
489 	}
490 
491 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
492 
493 	/* allocate and initialize the data channel */
494 	ret = rds_ep_init(&sp->session_dataep, hca_guid);
495 	if (ret != 0) {
496 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
497 		    "failed", sp, &sp->session_dataep);
498 		rds_ep_fini(&sp->session_ctrlep);
499 		return (-1);
500 	}
501 
502 	/* Clear the portmaps */
503 	rds_unmark_all_ports(sp, RDS_LOCAL);
504 	rds_unmark_all_ports(sp, RDS_REMOTE);
505 
506 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
507 
508 	RDS_DPRINTF2("rds_session_init", "Return");
509 
510 	return (0);
511 }
512 
513 /*
514  * This should be called before moving a session from ERROR state to
515  * INIT state. This will update the HCA keys incase the session has moved from
516  * one HCA to another.
517  */
518 int
519 rds_session_reinit(rds_session_t *sp, ib_gid_t lgid)
520 {
521 	rds_hca_t	*hcap, *hcap1;
522 	int		ret;
523 
524 	RDS_DPRINTF2("rds_session_reinit", "Enter: SP(0x%p)", sp);
525 
526 	/* CALLED WITH SESSION WRITE LOCK */
527 
528 	/* Clear the portmaps */
529 	rds_unmark_all_ports(sp, RDS_LOCAL);
530 	rds_unmark_all_ports(sp, RDS_REMOTE);
531 
532 	/* make the last buffer as the acknowledged */
533 	*(uintptr_t *)sp->session_dataep.ep_ack_addr =
534 	    (uintptr_t)sp->session_dataep.ep_sndpool.pool_tailp;
535 
536 	hcap = rds_gid_to_hcap(rdsib_statep, lgid);
537 	if (hcap == NULL) {
538 		RDS_DPRINTF2("rds_session_reinit", "SGID is on an "
539 		    "uninitialized HCA: %llx", lgid.gid_guid);
540 		return (-1);
541 	}
542 
543 	hcap1 = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
544 	if (hcap1 == NULL) {
545 		RDS_DPRINTF2("rds_session_reinit", "Seems like HCA %llx "
546 		    "is unplugged", sp->session_lgid.gid_guid);
547 	} else if (hcap->hca_guid == hcap1->hca_guid) {
548 		/*
549 		 * No action is needed as the session did not move across
550 		 * HCAs
551 		 */
552 		RDS_DPRINTF2("rds_session_reinit", "Failover on the same HCA");
553 		return (0);
554 	}
555 
556 	RDS_DPRINTF2("rds_session_reinit", "Failover across HCAs");
557 
558 	/* re-initialize the control channel */
559 	ret = rds_ep_reinit(&sp->session_ctrlep, hcap->hca_guid);
560 	if (ret != 0) {
561 		RDS_DPRINTF2("rds_session_reinit",
562 		    "SP(%p): Ctrl EP(%p) re-initialization failed",
563 		    sp, &sp->session_ctrlep);
564 		return (-1);
565 	}
566 
567 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Control EP(%p)",
568 	    sp, &sp->session_ctrlep);
569 
570 	/* re-initialize the data channel */
571 	ret = rds_ep_reinit(&sp->session_dataep, hcap->hca_guid);
572 	if (ret != 0) {
573 		RDS_DPRINTF2("rds_session_reinit",
574 		    "SP(%p): Data EP(%p) re-initialization failed",
575 		    sp, &sp->session_dataep);
576 		return (-1);
577 	}
578 
579 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Data EP(%p)",
580 	    sp, &sp->session_dataep);
581 
582 	sp->session_lgid = lgid;
583 
584 	RDS_DPRINTF2("rds_session_reinit", "Return: SP(0x%p)", sp);
585 
586 	return (0);
587 }
588 
589 static int
590 rds_session_connect(rds_session_t *sp)
591 {
592 	ibt_channel_hdl_t	ctrlchan, datachan;
593 	rds_ep_t		*ep;
594 	int			ret;
595 
596 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
597 
598 	sp->session_pinfo.pi_sid = rdsib_statep->rds_service_id;
599 
600 	/* Override the packet life time based on the conf file */
601 	if (IBPktLifeTime != 0) {
602 		sp->session_pinfo.pi_prim_cep_path.cep_cm_opaque1 =
603 		    IBPktLifeTime;
604 	}
605 
606 	/* Session type may change if we run into peer-to-peer case. */
607 	rw_enter(&sp->session_lock, RW_READER);
608 	if (sp->session_type == RDS_SESSION_PASSIVE) {
609 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
610 		    "active end", sp);
611 		rw_exit(&sp->session_lock);
612 		return (0); /* return success */
613 	}
614 	rw_exit(&sp->session_lock);
615 
616 	/* connect the data ep first */
617 	ep = &sp->session_dataep;
618 	mutex_enter(&ep->ep_lock);
619 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
620 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
621 		mutex_exit(&ep->ep_lock);
622 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
623 		    &datachan);
624 		if (ret != IBT_SUCCESS) {
625 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
626 			    "failed: %d", ep, ret);
627 			return (-1);
628 		}
629 		sp->session_dataep.ep_chanhdl = datachan;
630 	} else {
631 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
632 		    "unexpected state: %d", sp, ep, ep->ep_state);
633 		mutex_exit(&ep->ep_lock);
634 		return (-1);
635 	}
636 
637 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
638 	    sp, ep);
639 
640 	ep = &sp->session_ctrlep;
641 	mutex_enter(&ep->ep_lock);
642 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
643 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
644 		mutex_exit(&ep->ep_lock);
645 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
646 		    &ctrlchan);
647 		if (ret != IBT_SUCCESS) {
648 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
649 			    "failed: %d", ep, ret);
650 			return (-1);
651 		}
652 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
653 	} else {
654 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
655 		    "unexpected state: %d", sp, ep, ep->ep_state);
656 		mutex_exit(&ep->ep_lock);
657 		return (-1);
658 	}
659 
660 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
661 	    sp, sp->session_myip, sp->session_remip);
662 
663 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
664 
665 	return (0);
666 }
667 
668 /*
669  * Can be called with or without session_lock.
670  */
671 void
672 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
673 {
674 	rds_ep_t		*ep;
675 
676 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
677 	    sp->session_state);
678 
679 	ep = &sp->session_dataep;
680 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
681 
682 	/* wait until the SQ is empty before closing */
683 	if (wait != 0) {
684 		(void) rds_is_sendq_empty(ep, wait);
685 	}
686 
687 	mutex_enter(&ep->ep_lock);
688 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
689 		mutex_exit(&ep->ep_lock);
690 		delay(drv_usectohz(300000));
691 		mutex_enter(&ep->ep_lock);
692 	}
693 
694 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
695 		ep->ep_state = RDS_EP_STATE_CLOSING;
696 		mutex_exit(&ep->ep_lock);
697 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
698 		if (wait == 0) {
699 			/* make sure all WCs are flushed before proceeding */
700 			(void) rds_is_sendq_empty(ep, 1);
701 		}
702 		mutex_enter(&ep->ep_lock);
703 	}
704 	rds_ep_free_rc_channel(ep);
705 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
706 	ep->ep_segfbp = NULL;
707 	ep->ep_seglbp = NULL;
708 	mutex_exit(&ep->ep_lock);
709 
710 	ep = &sp->session_ctrlep;
711 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
712 
713 	/* wait until the SQ is empty before closing */
714 	if (wait != 0) {
715 		(void) rds_is_sendq_empty(ep, wait);
716 	}
717 
718 	mutex_enter(&ep->ep_lock);
719 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
720 		mutex_exit(&ep->ep_lock);
721 		delay(drv_usectohz(300000));
722 		mutex_enter(&ep->ep_lock);
723 	}
724 
725 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
726 		ep->ep_state = RDS_EP_STATE_CLOSING;
727 		mutex_exit(&ep->ep_lock);
728 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
729 		if (wait == 0) {
730 			/* make sure all WCs are flushed before proceeding */
731 			(void) rds_is_sendq_empty(ep, 1);
732 		}
733 		mutex_enter(&ep->ep_lock);
734 	}
735 	rds_ep_free_rc_channel(ep);
736 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
737 	ep->ep_segfbp = NULL;
738 	ep->ep_seglbp = NULL;
739 	mutex_exit(&ep->ep_lock);
740 
741 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
742 }
743 
744 /* Free the session */
745 static void
746 rds_destroy_session(rds_session_t *sp)
747 {
748 	rds_ep_t	*ep;
749 	rds_bufpool_t	*pool;
750 
751 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
752 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
753 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
754 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
755 
756 	rw_enter(&sp->session_lock, RW_READER);
757 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
758 	    sp->session_state);
759 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
760 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
761 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
762 		rw_exit(&sp->session_lock);
763 		delay(drv_usectohz(1000000));
764 		rw_enter(&sp->session_lock, RW_READER);
765 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
766 		    "ON SESSION", sp, sp->session_state);
767 	}
768 	rw_exit(&sp->session_lock);
769 
770 	/* data channel */
771 	ep = &sp->session_dataep;
772 
773 	/* send pool locks */
774 	pool = &ep->ep_sndpool;
775 	cv_destroy(&pool->pool_cv);
776 	mutex_destroy(&pool->pool_lock);
777 
778 	/* recv pool locks */
779 	pool = &ep->ep_rcvpool;
780 	cv_destroy(&pool->pool_cv);
781 	mutex_destroy(&pool->pool_lock);
782 	mutex_destroy(&ep->ep_recvqp.qp_lock);
783 
784 	/* control channel */
785 	ep = &sp->session_ctrlep;
786 
787 	/* send pool locks */
788 	pool = &ep->ep_sndpool;
789 	cv_destroy(&pool->pool_cv);
790 	mutex_destroy(&pool->pool_lock);
791 
792 	/* recv pool locks */
793 	pool = &ep->ep_rcvpool;
794 	cv_destroy(&pool->pool_cv);
795 	mutex_destroy(&pool->pool_lock);
796 	mutex_destroy(&ep->ep_recvqp.qp_lock);
797 
798 	/* session */
799 	rw_destroy(&sp->session_lock);
800 	rw_destroy(&sp->session_local_portmap_lock);
801 	rw_destroy(&sp->session_remote_portmap_lock);
802 
803 	/* free the session */
804 	kmem_free(sp, sizeof (rds_session_t));
805 
806 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
807 }
808 
809 /* This is called on the taskq thread */
810 static void
811 rds_failover_session(void *arg)
812 {
813 	rds_session_t	*sp = (rds_session_t *)arg;
814 	ib_gid_t	lgid, rgid;
815 	ipaddr_t	myip, remip;
816 	int		ret, cnt = 0;
817 
818 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
819 
820 	/* Make sure the session is still alive */
821 	if (rds_session_lkup_by_sp(sp) == B_FALSE) {
822 		RDS_DPRINTF2("rds_failover_session",
823 		    "Return: SP(%p) not ALIVE", sp);
824 		return;
825 	}
826 
827 	RDS_INCR_FAILOVERS();
828 
829 	rw_enter(&sp->session_lock, RW_WRITER);
830 	if (sp->session_type != RDS_SESSION_ACTIVE) {
831 		/*
832 		 * The remote side must have seen the error and initiated
833 		 * a re-connect.
834 		 */
835 		RDS_DPRINTF2("rds_failover_session",
836 		    "SP(%p) has become passive", sp);
837 		rw_exit(&sp->session_lock);
838 		return;
839 	}
840 	sp->session_failover = 1;
841 	rw_exit(&sp->session_lock);
842 
843 	/*
844 	 * The session is in ERROR state but close both channels
845 	 * for a clean start.
846 	 */
847 	rds_session_close(sp, IBT_BLOCKING, 1);
848 
849 	/* wait 1 sec before re-connecting */
850 	delay(drv_usectohz(1000000));
851 
852 	do {
853 		ibt_ip_path_attr_t	ipattr;
854 		ibt_ip_addr_t		dstip;
855 
856 		/* The ipaddr should be in the network order */
857 		myip = sp->session_myip;
858 		remip = sp->session_remip;
859 		ret = rds_sc_path_lookup(&myip, &remip);
860 		if (ret == 0) {
861 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
862 			    myip, remip);
863 		}
864 		/* check if we have (new) path from the source to destination */
865 		lgid.gid_prefix = 0;
866 		lgid.gid_guid = 0;
867 		rgid.gid_prefix = 0;
868 		rgid.gid_guid = 0;
869 
870 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
871 		dstip.family = AF_INET;
872 		dstip.un.ip4addr = remip;
873 		ipattr.ipa_dst_ip = &dstip;
874 		ipattr.ipa_src_ip.family = AF_INET;
875 		ipattr.ipa_src_ip.un.ip4addr = myip;
876 		ipattr.ipa_ndst = 1;
877 		ipattr.ipa_max_paths = 1;
878 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
879 		    myip, remip);
880 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
881 		    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo, NULL, NULL);
882 		if (ret == IBT_SUCCESS) {
883 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
884 			lgid = sp->session_pinfo.
885 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
886 			rgid = sp->session_pinfo.
887 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
888 			break;
889 		}
890 
891 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths failed, ret: %d ", ret);
892 
893 		/* wait 1 sec before re-trying */
894 		delay(drv_usectohz(1000000));
895 		cnt++;
896 	} while (cnt < 5);
897 
898 	if (ret != IBT_SUCCESS) {
899 		rw_enter(&sp->session_lock, RW_WRITER);
900 		if (sp->session_type == RDS_SESSION_ACTIVE) {
901 			rds_session_fini(sp);
902 			sp->session_state = RDS_SESSION_STATE_FAILED;
903 			sp->session_failover = 0;
904 			RDS_DPRINTF3("rds_failover_session",
905 			    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
906 		} else {
907 			RDS_DPRINTF2("rds_failover_session",
908 			    "SP(%p) has become passive", sp);
909 		}
910 		rw_exit(&sp->session_lock);
911 		return;
912 	}
913 
914 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
915 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
916 	    rgid.gid_guid);
917 
918 	rw_enter(&sp->session_lock, RW_WRITER);
919 	if (sp->session_type != RDS_SESSION_ACTIVE) {
920 		/*
921 		 * The remote side must have seen the error and initiated
922 		 * a re-connect.
923 		 */
924 		RDS_DPRINTF2("rds_failover_session",
925 		    "SP(%p) has become passive", sp);
926 		rw_exit(&sp->session_lock);
927 		return;
928 	}
929 
930 	/* move the session to init state */
931 	ret = rds_session_reinit(sp, lgid);
932 	sp->session_lgid = lgid;
933 	sp->session_rgid = rgid;
934 	if (ret != 0) {
935 		rds_session_fini(sp);
936 		sp->session_state = RDS_SESSION_STATE_FAILED;
937 		sp->session_failover = 0;
938 		RDS_DPRINTF3("rds_failover_session",
939 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
940 		rw_exit(&sp->session_lock);
941 		return;
942 	} else {
943 		sp->session_state = RDS_SESSION_STATE_INIT;
944 		RDS_DPRINTF3("rds_failover_session",
945 		    "SP(%p) State RDS_SESSION_STATE_INIT", sp);
946 	}
947 	rw_exit(&sp->session_lock);
948 
949 	rds_session_open(sp);
950 
951 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
952 }
953 
954 void
955 rds_handle_send_error(rds_ep_t *ep)
956 {
957 	if (rds_is_sendq_empty(ep, 0)) {
958 		/* Session should already be in ERROR, try to reconnect */
959 		RDS_DPRINTF2("rds_handle_send_error",
960 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
961 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
962 		    (void *)ep->ep_sp, DDI_SLEEP);
963 	}
964 }
965 
966 /*
967  * Called in the CM handler on the passive side
968  * Called on a taskq thread.
969  */
970 void
971 rds_cleanup_passive_session(void *arg)
972 {
973 	rds_session_t	*sp = arg;
974 
975 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
976 	    sp->session_state);
977 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
978 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
979 
980 	rds_session_close(sp, IBT_BLOCKING, 1);
981 
982 	rw_enter(&sp->session_lock, RW_WRITER);
983 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
984 		rds_session_fini(sp);
985 		sp->session_state = RDS_SESSION_STATE_FINI;
986 		sp->session_failover = 0;
987 		RDS_DPRINTF3("rds_cleanup_passive_session",
988 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
989 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
990 		rds_session_fini(sp);
991 		sp->session_state = RDS_SESSION_STATE_FAILED;
992 		sp->session_failover = 0;
993 		RDS_DPRINTF3("rds_cleanup_passive_session",
994 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
995 	}
996 	rw_exit(&sp->session_lock);
997 
998 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
999 }
1000 
1001 /*
1002  * Called by the CM handler on the passive side
1003  * Called with WRITE lock on the session
1004  */
1005 void
1006 rds_passive_session_fini(rds_session_t *sp)
1007 {
1008 	rds_ep_t	*ep;
1009 
1010 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
1011 	    sp->session_state);
1012 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
1013 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
1014 
1015 	/* clean the data channel */
1016 	ep = &sp->session_dataep;
1017 	(void) rds_is_sendq_empty(ep, 1);
1018 	mutex_enter(&ep->ep_lock);
1019 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1020 	    ep->ep_state);
1021 	rds_ep_free_rc_channel(ep);
1022 	mutex_exit(&ep->ep_lock);
1023 
1024 	/* clean the control channel */
1025 	ep = &sp->session_ctrlep;
1026 	(void) rds_is_sendq_empty(ep, 1);
1027 	mutex_enter(&ep->ep_lock);
1028 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1029 	    ep->ep_state);
1030 	rds_ep_free_rc_channel(ep);
1031 	mutex_exit(&ep->ep_lock);
1032 
1033 	rds_session_fini(sp);
1034 	sp->session_failover = 0;
1035 
1036 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
1037 }
1038 
1039 /*
1040  * Can be called:
1041  * 1. on driver detach
1042  * 2. on taskq thread
1043  * arg is always NULL
1044  */
1045 /* ARGSUSED */
1046 void
1047 rds_close_sessions(void *arg)
1048 {
1049 	rds_session_t *sp, *spnextp;
1050 
1051 	RDS_DPRINTF2("rds_close_sessions", "Enter");
1052 
1053 	/* wait until all the buffers are freed by the sockets */
1054 	while (RDS_GET_RXPKTS_PEND() != 0) {
1055 		/* wait one second and try again */
1056 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
1057 		    "pending packets", RDS_GET_RXPKTS_PEND());
1058 		delay(drv_usectohz(1000000));
1059 	}
1060 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
1061 
1062 	/* close all the sessions */
1063 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
1064 	sp = rdsib_statep->rds_sessionlistp;
1065 	while (sp) {
1066 		rw_enter(&sp->session_lock, RW_WRITER);
1067 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
1068 		    sp->session_state);
1069 
1070 		switch (sp->session_state) {
1071 		case RDS_SESSION_STATE_CONNECTED:
1072 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1073 			rw_exit(&sp->session_lock);
1074 
1075 			rds_session_close(sp, IBT_BLOCKING, 1);
1076 
1077 			rw_enter(&sp->session_lock, RW_WRITER);
1078 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1079 			RDS_DPRINTF3("rds_close_sessions",
1080 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1081 			rds_session_fini(sp);
1082 			sp->session_state = RDS_SESSION_STATE_FINI;
1083 			sp->session_failover = 0;
1084 			RDS_DPRINTF3("rds_close_sessions",
1085 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1086 			break;
1087 
1088 		case RDS_SESSION_STATE_ERROR:
1089 		case RDS_SESSION_STATE_PASSIVE_CLOSING:
1090 		case RDS_SESSION_STATE_INIT:
1091 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1092 			rw_exit(&sp->session_lock);
1093 
1094 			rds_session_close(sp, IBT_BLOCKING, 1);
1095 
1096 			rw_enter(&sp->session_lock, RW_WRITER);
1097 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1098 			RDS_DPRINTF3("rds_close_sessions",
1099 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1100 			/* FALLTHRU */
1101 		case RDS_SESSION_STATE_CLOSED:
1102 			rds_session_fini(sp);
1103 			sp->session_state = RDS_SESSION_STATE_FINI;
1104 			sp->session_failover = 0;
1105 			RDS_DPRINTF3("rds_close_sessions",
1106 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1107 			break;
1108 		}
1109 
1110 		rw_exit(&sp->session_lock);
1111 		sp = sp->session_nextp;
1112 	}
1113 
1114 	sp = rdsib_statep->rds_sessionlistp;
1115 	rdsib_statep->rds_sessionlistp = NULL;
1116 	rdsib_statep->rds_nsessions = 0;
1117 	rw_exit(&rdsib_statep->rds_sessionlock);
1118 
1119 	while (sp) {
1120 		spnextp = sp->session_nextp;
1121 		rds_destroy_session(sp);
1122 		RDS_DECR_SESS();
1123 		sp = spnextp;
1124 	}
1125 
1126 	/* free the global pool */
1127 	rds_free_recv_caches(rdsib_statep);
1128 
1129 	RDS_DPRINTF2("rds_close_sessions", "Return");
1130 }
1131 
1132 void
1133 rds_session_open(rds_session_t *sp)
1134 {
1135 	int		ret;
1136 
1137 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
1138 
1139 	ret = rds_session_connect(sp);
1140 	if (ret == -1) {
1141 		/*
1142 		 * may be the session has become passive due to
1143 		 * hitting peer-to-peer case
1144 		 */
1145 		rw_enter(&sp->session_lock, RW_READER);
1146 		if (sp->session_type == RDS_SESSION_PASSIVE) {
1147 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
1148 			    "has become passive from active", sp);
1149 			rw_exit(&sp->session_lock);
1150 			return;
1151 		}
1152 
1153 		/* get the lock for writing */
1154 		rw_exit(&sp->session_lock);
1155 		rw_enter(&sp->session_lock, RW_WRITER);
1156 		sp->session_state = RDS_SESSION_STATE_ERROR;
1157 		RDS_DPRINTF3("rds_session_open",
1158 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
1159 		rw_exit(&sp->session_lock);
1160 
1161 		/* Connect request failed */
1162 		rds_session_close(sp, IBT_BLOCKING, 1);
1163 
1164 		rw_enter(&sp->session_lock, RW_WRITER);
1165 		rds_session_fini(sp);
1166 		sp->session_state = RDS_SESSION_STATE_FAILED;
1167 		sp->session_failover = 0;
1168 		RDS_DPRINTF3("rds_session_open",
1169 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1170 		rw_exit(&sp->session_lock);
1171 
1172 		return;
1173 	}
1174 
1175 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
1176 }
1177 
1178 /*
1179  * Creates a session and inserts it into the list of sessions. The session
1180  * state would be CREATED.
1181  * Return Values:
1182  *	EWOULDBLOCK
1183  */
1184 rds_session_t *
1185 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
1186     ibt_cm_req_rcv_t *reqp, uint8_t type)
1187 {
1188 	ib_gid_t	lgid, rgid;
1189 	rds_session_t	*newp, *oldp;
1190 	rds_ep_t	*dataep, *ctrlep;
1191 	rds_bufpool_t	*pool;
1192 	int		ret;
1193 
1194 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x, type: %d",
1195 	    statep, localip, remip, type);
1196 
1197 	/* Check if there is space for a new session */
1198 	rw_enter(&statep->rds_sessionlock, RW_READER);
1199 	if (statep->rds_nsessions >= (MaxNodes - 1)) {
1200 		rw_exit(&statep->rds_sessionlock);
1201 		RDS_DPRINTF1("rds_session_create", "No More Sessions allowed");
1202 		return (NULL);
1203 	}
1204 	rw_exit(&statep->rds_sessionlock);
1205 
1206 	/* Allocate and initialize global buffer pool */
1207 	ret = rds_init_recv_caches(statep);
1208 	if (ret != 0) {
1209 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
1210 		return (NULL);
1211 	}
1212 
1213 	/* enough memory for session (includes 2 endpoints) */
1214 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
1215 
1216 	newp->session_remip = remip;
1217 	newp->session_myip = localip;
1218 	newp->session_type = type;
1219 	newp->session_state = RDS_SESSION_STATE_CREATED;
1220 	RDS_DPRINTF3("rds_session_create",
1221 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
1222 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
1223 	rw_init(&newp->session_local_portmap_lock, NULL, RW_DRIVER, NULL);
1224 	rw_init(&newp->session_remote_portmap_lock, NULL, RW_DRIVER, NULL);
1225 
1226 	/* Initialize data endpoint */
1227 	dataep = &newp->session_dataep;
1228 	dataep->ep_remip = newp->session_remip;
1229 	dataep->ep_myip = newp->session_myip;
1230 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
1231 	dataep->ep_sp = newp;
1232 	dataep->ep_type = RDS_EP_TYPE_DATA;
1233 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1234 
1235 	/* Initialize send pool locks */
1236 	pool = &dataep->ep_sndpool;
1237 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1238 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1239 
1240 	/* Initialize recv pool locks */
1241 	pool = &dataep->ep_rcvpool;
1242 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1243 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1244 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1245 
1246 	/* Initialize control endpoint */
1247 	ctrlep = &newp->session_ctrlep;
1248 	ctrlep->ep_remip = newp->session_remip;
1249 	ctrlep->ep_myip = newp->session_myip;
1250 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
1251 	ctrlep->ep_sp = newp;
1252 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
1253 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1254 
1255 	/* Initialize send pool locks */
1256 	pool = &ctrlep->ep_sndpool;
1257 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1258 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1259 
1260 	/* Initialize recv pool locks */
1261 	pool = &ctrlep->ep_rcvpool;
1262 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1263 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1264 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1265 
1266 	/* lkup if there is already a session */
1267 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
1268 	oldp = rds_session_lkup(statep, remip, 0);
1269 	if (oldp != NULL) {
1270 		/* A session to this destination exists */
1271 		rw_exit(&statep->rds_sessionlock);
1272 		rw_destroy(&newp->session_lock);
1273 		rw_destroy(&newp->session_local_portmap_lock);
1274 		rw_destroy(&newp->session_remote_portmap_lock);
1275 		mutex_destroy(&dataep->ep_lock);
1276 		mutex_destroy(&ctrlep->ep_lock);
1277 		kmem_free(newp, sizeof (rds_session_t));
1278 		return (NULL);
1279 	}
1280 
1281 	/* Insert this session into the list */
1282 	if (rds_add_session(newp, B_TRUE) != B_TRUE) {
1283 		/* No room to add this session */
1284 		rw_exit(&statep->rds_sessionlock);
1285 		rw_destroy(&newp->session_lock);
1286 		rw_destroy(&newp->session_local_portmap_lock);
1287 		rw_destroy(&newp->session_remote_portmap_lock);
1288 		mutex_destroy(&dataep->ep_lock);
1289 		mutex_destroy(&ctrlep->ep_lock);
1290 		kmem_free(newp, sizeof (rds_session_t));
1291 		return (NULL);
1292 	}
1293 
1294 	/* unlock the session list */
1295 	rw_exit(&statep->rds_sessionlock);
1296 
1297 	if (type == RDS_SESSION_ACTIVE) {
1298 		ipaddr_t		localip1, remip1;
1299 		ibt_ip_path_attr_t	ipattr;
1300 		ibt_ip_addr_t		dstip;
1301 
1302 		/* The ipaddr should be in the network order */
1303 		localip1 = localip;
1304 		remip1 = remip;
1305 		ret = rds_sc_path_lookup(&localip1, &remip1);
1306 		if (ret == 0) {
1307 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1308 			    localip, remip);
1309 		}
1310 
1311 		/* Get the gids for the source and destination ip addrs */
1312 		lgid.gid_prefix = 0;
1313 		lgid.gid_guid = 0;
1314 		rgid.gid_prefix = 0;
1315 		rgid.gid_guid = 0;
1316 
1317 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1318 		dstip.family = AF_INET;
1319 		dstip.un.ip4addr = remip1;
1320 		ipattr.ipa_dst_ip = &dstip;
1321 		ipattr.ipa_src_ip.family = AF_INET;
1322 		ipattr.ipa_src_ip.un.ip4addr = localip1;
1323 		ipattr.ipa_ndst = 1;
1324 		ipattr.ipa_max_paths = 1;
1325 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
1326 		    localip1, remip1);
1327 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
1328 		    IBT_PATH_NO_FLAGS, &ipattr, &newp->session_pinfo,
1329 		    NULL, NULL);
1330 		if (ret != IBT_SUCCESS) {
1331 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths failed, ret: %d "
1332 			    "lgid: %llx:%llx rgid: %llx:%llx", lgid.gid_prefix,
1333 			    lgid.gid_guid, rgid.gid_prefix, rgid.gid_guid);
1334 
1335 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1336 			return (NULL);
1337 		}
1338 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
1339 		lgid =
1340 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_sgid;
1341 		rgid =
1342 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_dgid;
1343 
1344 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1345 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1346 		    rgid.gid_guid);
1347 	}
1348 
1349 	rw_enter(&newp->session_lock, RW_WRITER);
1350 	/* check for peer-to-peer case */
1351 	if (type == newp->session_type) {
1352 		/* no peer-to-peer case */
1353 		if (type == RDS_SESSION_ACTIVE) {
1354 			newp->session_lgid = lgid;
1355 			newp->session_rgid = rgid;
1356 		} else {
1357 			/* rgid is requester gid & lgid is receiver gid */
1358 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1359 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1360 		}
1361 	}
1362 	rw_exit(&newp->session_lock);
1363 
1364 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1365 
1366 	return (newp);
1367 }
1368 
1369 void
1370 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1371 {
1372 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1373 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1374 
1375 	switch (cpkt->rcp_code) {
1376 	case RDS_CTRL_CODE_STALL:
1377 		RDS_INCR_STALLS_RCVD();
1378 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1379 		break;
1380 	case RDS_CTRL_CODE_UNSTALL:
1381 		RDS_INCR_UNSTALLS_RCVD();
1382 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1383 		break;
1384 	case RDS_CTRL_CODE_STALL_PORTS:
1385 		rds_mark_all_ports(sp, RDS_REMOTE);
1386 		break;
1387 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1388 		rds_unmark_all_ports(sp, RDS_REMOTE);
1389 		break;
1390 	case RDS_CTRL_CODE_HEARTBEAT:
1391 		break;
1392 	default:
1393 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1394 		    cpkt->rcp_code);
1395 		break;
1396 	}
1397 
1398 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1399 }
1400 
1401 int
1402 rds_post_control_message(rds_session_t *sp, uint8_t code, in_port_t port)
1403 {
1404 	ibt_send_wr_t	wr;
1405 	rds_ep_t	*ep;
1406 	rds_buf_t	*bp;
1407 	rds_ctrl_pkt_t	*cp;
1408 	int		ret;
1409 
1410 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1411 	    "Port: %d", sp, code, port);
1412 
1413 	ep = &sp->session_ctrlep;
1414 
1415 	bp = rds_get_send_buf(ep, 1);
1416 	if (bp == NULL) {
1417 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1418 		    "message: SP(%p) Code: %d Port: %d", sp, code,
1419 		    port);
1420 		return (-1);
1421 	}
1422 
1423 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1424 	cp->rcp_code = code;
1425 	cp->rcp_port = port;
1426 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1427 
1428 	wr.wr_id = (uintptr_t)bp;
1429 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1430 	wr.wr_trans = IBT_RC_SRV;
1431 	wr.wr_opcode = IBT_WRC_SEND;
1432 	wr.wr_nds = 1;
1433 	wr.wr_sgl = &bp->buf_ds;
1434 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1435 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1436 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1437 	if (ret != IBT_SUCCESS) {
1438 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1439 		    "%d", ep, ret);
1440 		bp->buf_state = RDS_SNDBUF_FREE;
1441 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1442 		return (-1);
1443 	}
1444 
1445 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1446 	    "Port: %d", sp, code, port);
1447 
1448 	return (0);
1449 }
1450 
1451 void
1452 rds_stall_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
1453 {
1454 	int		ret;
1455 
1456 	RDS_DPRINTF4("rds_stall_port", "Enter: SP(%p) Port %d", sp, port);
1457 
1458 	RDS_INCR_STALLS_TRIGGERED();
1459 
1460 	if (!rds_check_n_mark_port(sp, port, qualifier)) {
1461 
1462 		if (sp != NULL) {
1463 			ret = rds_post_control_message(sp,
1464 			    RDS_CTRL_CODE_STALL, port);
1465 			if (ret != 0) {
1466 				(void) rds_check_n_unmark_port(sp, port,
1467 				    qualifier);
1468 				return;
1469 			}
1470 			RDS_INCR_STALLS_SENT();
1471 		}
1472 	} else {
1473 		RDS_DPRINTF3(LABEL,
1474 		    "Port %d is already in stall state", port);
1475 	}
1476 
1477 	RDS_DPRINTF4("rds_stall_port", "Return: SP(%p) Port %d", sp, port);
1478 }
1479 
1480 void
1481 rds_resume_port(in_port_t port)
1482 {
1483 	rds_session_t	*sp;
1484 	uint_t		ix;
1485 	int		ret;
1486 
1487 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1488 
1489 	RDS_INCR_UNSTALLS_TRIGGERED();
1490 
1491 	/* resume loopback traffic */
1492 	(void) rds_check_n_unmark_port(NULL, port, RDS_LOOPBACK);
1493 
1494 	/* send unstall messages to resume the remote traffic */
1495 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1496 
1497 	sp = rdsib_statep->rds_sessionlistp;
1498 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1499 		ASSERT(sp != NULL);
1500 		if ((sp->session_state == RDS_SESSION_STATE_CONNECTED) &&
1501 		    (rds_check_n_unmark_port(sp, port, RDS_LOCAL))) {
1502 				ret = rds_post_control_message(sp,
1503 				    RDS_CTRL_CODE_UNSTALL, port);
1504 				if (ret != 0) {
1505 					(void) rds_check_n_mark_port(sp, port,
1506 					    RDS_LOCAL);
1507 				} else {
1508 					RDS_INCR_UNSTALLS_SENT();
1509 				}
1510 		}
1511 
1512 		sp = sp->session_nextp;
1513 	}
1514 
1515 	rw_exit(&rdsib_statep->rds_sessionlock);
1516 
1517 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1518 }
1519 
1520 static int
1521 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1522     in_port_t recvport)
1523 {
1524 	ibt_send_wr_t	*wrp, wr;
1525 	rds_buf_t	*bp, *bp1;
1526 	rds_data_hdr_t	*pktp;
1527 	uint32_t	msgsize, npkts, residual, pktno, ix;
1528 	int		ret;
1529 
1530 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1531 	    ep, uiop);
1532 
1533 	/* how many pkts are needed to carry this msg */
1534 	msgsize = uiop->uio_resid;
1535 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1536 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1537 
1538 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1539 	    msgsize, npkts);
1540 
1541 	/* Get the buffers needed to post this message */
1542 	bp = rds_get_send_buf(ep, npkts);
1543 	if (bp == NULL) {
1544 		RDS_INCR_ENOBUFS();
1545 		return (ENOBUFS);
1546 	}
1547 
1548 	if (npkts > 1) {
1549 		/*
1550 		 * multi-pkt messages are posted at the same time as a list
1551 		 * of WRs
1552 		 */
1553 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1554 		    npkts, KM_SLEEP);
1555 	}
1556 
1557 
1558 	pktno = 0;
1559 	bp1 = bp;
1560 	do {
1561 		/* prepare the header */
1562 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1563 		pktp->dh_datalen = UserBufferSize;
1564 		pktp->dh_npkts = npkts - pktno;
1565 		pktp->dh_psn = pktno;
1566 		pktp->dh_sendport = sendport;
1567 		pktp->dh_recvport = recvport;
1568 		bp1->buf_ds.ds_len = RdsPktSize;
1569 
1570 		/* copy the data */
1571 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1572 		    UserBufferSize, UIO_WRITE, uiop);
1573 		if (ret != 0) {
1574 			break;
1575 		}
1576 
1577 		if (uiop->uio_resid == 0) {
1578 			pktp->dh_datalen = residual;
1579 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1580 			break;
1581 		}
1582 		pktno++;
1583 		bp1 = bp1->buf_nextp;
1584 	} while (uiop->uio_resid);
1585 
1586 	if (ret) {
1587 		/* uiomove failed */
1588 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1589 		    uiop, ret);
1590 		if (npkts > 1) {
1591 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1592 		}
1593 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1594 		return (ret);
1595 	}
1596 
1597 	if (npkts > 1) {
1598 		/* multi-pkt message */
1599 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1600 
1601 		bp1 = bp;
1602 		for (ix = 0; ix < npkts; ix++) {
1603 			wrp[ix].wr_id = (uintptr_t)bp1;
1604 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1605 			wrp[ix].wr_trans = IBT_RC_SRV;
1606 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1607 			wrp[ix].wr_nds = 1;
1608 			wrp[ix].wr_sgl = &bp1->buf_ds;
1609 			bp1 = bp1->buf_nextp;
1610 		}
1611 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1612 
1613 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1614 		if (ret != IBT_SUCCESS) {
1615 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1616 			    "%d for %d pkts", ep, ret, npkts);
1617 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1618 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1619 			return (ret);
1620 		}
1621 
1622 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1623 	} else {
1624 		/* single pkt */
1625 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1626 		wr.wr_id = (uintptr_t)bp;
1627 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1628 		wr.wr_trans = IBT_RC_SRV;
1629 		wr.wr_opcode = IBT_WRC_SEND;
1630 		wr.wr_nds = 1;
1631 		wr.wr_sgl = &bp->buf_ds;
1632 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1633 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1634 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1635 		if (ret != IBT_SUCCESS) {
1636 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1637 			    "%d", ep, ret);
1638 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1639 			return (ret);
1640 		}
1641 	}
1642 
1643 	RDS_INCR_TXPKTS(npkts);
1644 	RDS_INCR_TXBYTES(msgsize);
1645 
1646 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1647 	    ep, uiop);
1648 
1649 	return (0);
1650 }
1651 
1652 static int
1653 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1654     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1655 {
1656 	mblk_t		*mp;
1657 	int		ret;
1658 
1659 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1660 
1661 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1662 	    "%d to recvport: %d", sendport, recvport);
1663 
1664 	mp = allocb(uiop->uio_resid, BPRI_MED);
1665 	if (mp == NULL) {
1666 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1667 		    uiop->uio_resid);
1668 		return (ENOSPC);
1669 	}
1670 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1671 
1672 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1673 	if (ret) {
1674 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1675 		freeb(mp);
1676 		return (ret);
1677 	}
1678 
1679 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1680 	    zoneid);
1681 	if (ret != 0) {
1682 		if (ret == ENOSPC) {
1683 			/*
1684 			 * The message is delivered but cannot take more,
1685 			 * stop further loopback traffic to this port
1686 			 */
1687 			RDS_DPRINTF3("rds_deliver_loopback_msg",
1688 			    "Port %d NO SPACE", recvport);
1689 			rds_stall_port(NULL, recvport, RDS_LOOPBACK);
1690 		} else {
1691 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1692 			    "port %d failed: %d", sendport, recvport, ret);
1693 			return (ret);
1694 		}
1695 	}
1696 
1697 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1698 	return (0);
1699 }
1700 
1701 static void
1702 rds_resend_messages(void *arg)
1703 {
1704 	rds_session_t	*sp = (rds_session_t *)arg;
1705 	rds_ep_t	*ep;
1706 	rds_bufpool_t	*spool;
1707 	rds_buf_t	*bp, *endp, *tmp;
1708 	ibt_send_wr_t	*wrp;
1709 	uint_t		nwr = 0, ix, jx;
1710 	int		ret;
1711 
1712 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1713 
1714 	ep = &sp->session_dataep;
1715 
1716 	spool = &ep->ep_sndpool;
1717 	mutex_enter(&spool->pool_lock);
1718 
1719 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1720 
1721 	if (ep->ep_lbufid == NULL) {
1722 		RDS_DPRINTF2("rds_resend_messages",
1723 		    "SP(%p) Remote session is cleaned up ", sp);
1724 		/*
1725 		 * The remote end cleaned up its session. There may be loss
1726 		 * of messages. Mark all buffers as acknowledged.
1727 		 */
1728 		tmp = spool->pool_tailp;
1729 	} else {
1730 		tmp = (rds_buf_t *)ep->ep_lbufid;
1731 		RDS_DPRINTF2("rds_resend_messages",
1732 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1733 	}
1734 
1735 	endp = spool->pool_tailp;
1736 	bp = spool->pool_headp;
1737 	jx = 0;
1738 	while ((bp != NULL) && (bp != tmp)) {
1739 		bp->buf_state = RDS_SNDBUF_FREE;
1740 		jx++;
1741 		bp = bp->buf_nextp;
1742 	}
1743 
1744 	if (bp == NULL) {
1745 		mutex_exit(&spool->pool_lock);
1746 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1747 		    "found in the list", tmp);
1748 
1749 		rw_enter(&sp->session_lock, RW_WRITER);
1750 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1751 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1752 		} else {
1753 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1754 			    "Expected State: %d", sp, sp->session_state,
1755 			    RDS_SESSION_STATE_CONNECTED);
1756 		}
1757 		sp->session_failover = 0;
1758 		rw_exit(&sp->session_lock);
1759 		return;
1760 	}
1761 
1762 	/* Found the match */
1763 	bp->buf_state = RDS_SNDBUF_FREE;
1764 	jx++;
1765 
1766 	spool->pool_tailp = bp;
1767 	bp = bp->buf_nextp;
1768 	spool->pool_tailp->buf_nextp = NULL;
1769 	nwr = spool->pool_nfree - jx;
1770 	spool->pool_nfree = jx;
1771 	mutex_exit(&spool->pool_lock);
1772 
1773 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1774 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1775 
1776 	if (bp) {
1777 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1778 		    KM_SLEEP);
1779 
1780 		while (nwr) {
1781 			jx = (nwr > 100) ? 100 : nwr;
1782 
1783 			tmp = bp;
1784 			for (ix = 0; ix < jx; ix++) {
1785 				bp->buf_state = RDS_SNDBUF_PENDING;
1786 				wrp[ix].wr_id = (uintptr_t)bp;
1787 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1788 				wrp[ix].wr_trans = IBT_RC_SRV;
1789 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1790 				wrp[ix].wr_nds = 1;
1791 				wrp[ix].wr_sgl = &bp->buf_ds;
1792 				bp = bp->buf_nextp;
1793 			}
1794 
1795 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1796 			if (ret != IBT_SUCCESS) {
1797 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1798 				    "failed: %d for % pkts", ep, ret, jx);
1799 				break;
1800 			}
1801 
1802 			mutex_enter(&spool->pool_lock);
1803 			spool->pool_nbusy += jx;
1804 			mutex_exit(&spool->pool_lock);
1805 
1806 			nwr -= jx;
1807 		}
1808 
1809 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1810 
1811 		if (nwr != 0) {
1812 
1813 			/*
1814 			 * An error while failover is in progress. Some WRs are
1815 			 * posted while other remain. If any of the posted WRs
1816 			 * complete in error then they would dispatch a taskq to
1817 			 * do a failover. Getting the session lock will prevent
1818 			 * the taskq to wait until we are done here.
1819 			 */
1820 			rw_enter(&sp->session_lock, RW_READER);
1821 
1822 			/*
1823 			 * Wait until all the previous WRs are completed and
1824 			 * then queue the remaining, otherwise the order of
1825 			 * the messages may change.
1826 			 */
1827 			(void) rds_is_sendq_empty(ep, 1);
1828 
1829 			/* free the remaining buffers */
1830 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1831 
1832 			rw_exit(&sp->session_lock);
1833 			return;
1834 		}
1835 	}
1836 
1837 	rw_enter(&sp->session_lock, RW_WRITER);
1838 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1839 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1840 	} else {
1841 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1842 		    "Expected State: %d", sp, sp->session_state,
1843 		    RDS_SESSION_STATE_CONNECTED);
1844 	}
1845 	sp->session_failover = 0;
1846 	rw_exit(&sp->session_lock);
1847 
1848 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1849 }
1850 
1851 /*
1852  * This is called when a channel is connected. Transition the session to
1853  * CONNECTED state iff both channels are connected.
1854  */
1855 void
1856 rds_session_active(rds_session_t *sp)
1857 {
1858 	rds_ep_t	*ep;
1859 	uint_t		failover;
1860 
1861 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1862 
1863 	rw_enter(&sp->session_lock, RW_READER);
1864 
1865 	failover = sp->session_failover;
1866 
1867 	/*
1868 	 * we establish the data channel first, so check the control channel
1869 	 * first but make sure it is initialized.
1870 	 */
1871 	ep = &sp->session_ctrlep;
1872 	mutex_enter(&ep->ep_lock);
1873 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1874 		/* the session is not ready yet */
1875 		mutex_exit(&ep->ep_lock);
1876 		rw_exit(&sp->session_lock);
1877 		return;
1878 	}
1879 	mutex_exit(&ep->ep_lock);
1880 
1881 	/* control channel is connected, check the data channel */
1882 	ep = &sp->session_dataep;
1883 	mutex_enter(&ep->ep_lock);
1884 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1885 		/* data channel is not yet connected */
1886 		mutex_exit(&ep->ep_lock);
1887 		rw_exit(&sp->session_lock);
1888 		return;
1889 	}
1890 	mutex_exit(&ep->ep_lock);
1891 
1892 	if (failover) {
1893 		rw_exit(&sp->session_lock);
1894 
1895 		/*
1896 		 * The session has failed over. Previous msgs have to be
1897 		 * re-sent before the session is moved to the connected
1898 		 * state.
1899 		 */
1900 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1901 		    "to re-send messages", sp);
1902 		(void) ddi_taskq_dispatch(rds_taskq,
1903 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1904 		return;
1905 	}
1906 
1907 	/* the session is ready */
1908 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1909 	RDS_DPRINTF3("rds_session_active",
1910 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1911 
1912 	rw_exit(&sp->session_lock);
1913 
1914 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1915 }
1916 
1917 static int
1918 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1919     in_port_t recvport)
1920 {
1921 	int	ret;
1922 
1923 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1924 	    "%d", ep, sendport, recvport);
1925 
1926 	/* make sure the remote port is not stalled */
1927 	if (rds_is_port_marked(ep->ep_sp, recvport, RDS_REMOTE)) {
1928 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1929 		    ep->ep_sp, recvport);
1930 		RDS_INCR_EWOULDBLOCK();
1931 		ret = ENOMEM;
1932 	} else {
1933 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1934 	}
1935 
1936 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1937 
1938 	return (ret);
1939 }
1940 
1941 /* Send a message to a destination socket */
1942 int
1943 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1944     in_port_t recvport, zoneid_t zoneid)
1945 {
1946 	rds_session_t	*sp;
1947 	ib_gid_t	lgid, rgid;
1948 	int		ret;
1949 
1950 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1951 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1952 	    sendport, recvport);
1953 
1954 	/* If msg length is 0, just return success */
1955 	if (uiop->uio_resid == 0) {
1956 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
1957 		return (0);
1958 	}
1959 
1960 	/* Is there a session to the destination? */
1961 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1962 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
1963 	rw_exit(&rdsib_statep->rds_sessionlock);
1964 
1965 	/* Is this a loopback message? */
1966 	if ((sp == NULL) && (rds_islocal(recvip))) {
1967 		/* make sure the port is not stalled */
1968 		if (rds_is_port_marked(NULL, recvport, RDS_LOOPBACK)) {
1969 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
1970 			    recvport);
1971 			RDS_INCR_EWOULDBLOCK();
1972 			return (ENOMEM);
1973 		}
1974 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
1975 		    sendport, zoneid);
1976 		return (ret);
1977 	}
1978 
1979 	/* Not a loopback message */
1980 	if (sp == NULL) {
1981 		/* There is no session to the destination, create one. */
1982 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
1983 		    "IP: 0x%x", recvip);
1984 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
1985 		    RDS_SESSION_ACTIVE);
1986 		if (sp != NULL) {
1987 			rw_enter(&sp->session_lock, RW_WRITER);
1988 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1989 				ret = rds_session_init(sp);
1990 				if (ret != 0) {
1991 					RDS_DPRINTF2("rds_sendmsg",
1992 					    "SP(%p): rds_session_init failed",
1993 					    sp);
1994 					sp->session_state =
1995 					    RDS_SESSION_STATE_FAILED;
1996 					RDS_DPRINTF3("rds_sendmsg",
1997 					    "SP(%p) State "
1998 					    "RDS_SESSION_STATE_FAILED", sp);
1999 					rw_exit(&sp->session_lock);
2000 					return (EFAULT);
2001 				}
2002 				sp->session_state = RDS_SESSION_STATE_INIT;
2003 				RDS_DPRINTF3("rds_sendmsg",
2004 				    "SP(%p) State "
2005 				    "RDS_SESSION_STATE_INIT", sp);
2006 				rw_exit(&sp->session_lock);
2007 				rds_session_open(sp);
2008 			} else {
2009 				rw_exit(&sp->session_lock);
2010 			}
2011 		} else {
2012 			/* Is a session created for this destination */
2013 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
2014 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
2015 			rw_exit(&rdsib_statep->rds_sessionlock);
2016 			if (sp == NULL) {
2017 				return (EFAULT);
2018 			}
2019 		}
2020 	}
2021 
2022 	/* There is a session to the destination */
2023 	rw_enter(&sp->session_lock, RW_READER);
2024 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2025 		rw_exit(&sp->session_lock);
2026 
2027 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2028 		    recvport);
2029 		return (ret);
2030 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2031 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2032 		ipaddr_t sendip1, recvip1;
2033 
2034 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
2035 		    "%d", sp, sp->session_state);
2036 		rw_exit(&sp->session_lock);
2037 		rw_enter(&sp->session_lock, RW_WRITER);
2038 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2039 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2040 			ibt_ip_path_attr_t	ipattr;
2041 			ibt_ip_addr_t		dstip;
2042 
2043 			sp->session_state = RDS_SESSION_STATE_CREATED;
2044 			sp->session_type = RDS_SESSION_ACTIVE;
2045 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
2046 			    "RDS_SESSION_STATE_CREATED", sp);
2047 			rw_exit(&sp->session_lock);
2048 
2049 
2050 			/* The ipaddr should be in the network order */
2051 			sendip1 = sendip;
2052 			recvip1 = recvip;
2053 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
2054 			if (ret == 0) {
2055 				RDS_DPRINTF2(LABEL, "Path not found "
2056 				    "(0x%x 0x%x)", sendip1, recvip1);
2057 			}
2058 
2059 			/* Resolve the IP addresses */
2060 			lgid.gid_prefix = 0;
2061 			lgid.gid_guid = 0;
2062 			rgid.gid_prefix = 0;
2063 			rgid.gid_guid = 0;
2064 
2065 			bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
2066 			dstip.family = AF_INET;
2067 			dstip.un.ip4addr = recvip1;
2068 			ipattr.ipa_dst_ip = &dstip;
2069 			ipattr.ipa_src_ip.family = AF_INET;
2070 			ipattr.ipa_src_ip.un.ip4addr = sendip1;
2071 			ipattr.ipa_ndst = 1;
2072 			ipattr.ipa_max_paths = 1;
2073 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
2074 			    sendip1, recvip1);
2075 			ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
2076 			    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo,
2077 			    NULL, NULL);
2078 			if (ret != IBT_SUCCESS) {
2079 				RDS_DPRINTF2("rds_sendmsg",
2080 				    "ibt_get_ip_paths failed, ret: %d ", ret);
2081 
2082 				rw_enter(&sp->session_lock, RW_WRITER);
2083 				if (sp->session_type == RDS_SESSION_ACTIVE) {
2084 					sp->session_state =
2085 					    RDS_SESSION_STATE_FAILED;
2086 					RDS_DPRINTF3("rds_sendmsg",
2087 					    "SP(%p) State "
2088 					    "RDS_SESSION_STATE_FAILED", sp);
2089 					rw_exit(&sp->session_lock);
2090 					return (EFAULT);
2091 				} else {
2092 					rw_exit(&sp->session_lock);
2093 					return (ENOMEM);
2094 				}
2095 			}
2096 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
2097 			lgid = sp->session_pinfo.
2098 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
2099 			rgid = sp->session_pinfo.
2100 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
2101 
2102 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
2103 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
2104 			    rgid.gid_guid);
2105 
2106 			rw_enter(&sp->session_lock, RW_WRITER);
2107 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2108 				sp->session_lgid = lgid;
2109 				sp->session_rgid = rgid;
2110 				ret = rds_session_init(sp);
2111 				if (ret != 0) {
2112 					RDS_DPRINTF2("rds_sendmsg",
2113 					    "SP(%p): rds_session_init failed",
2114 					    sp);
2115 					sp->session_state =
2116 					    RDS_SESSION_STATE_FAILED;
2117 					RDS_DPRINTF3("rds_sendmsg",
2118 					    "SP(%p) State "
2119 					    "RDS_SESSION_STATE_FAILED", sp);
2120 					rw_exit(&sp->session_lock);
2121 					return (EFAULT);
2122 				}
2123 				sp->session_state = RDS_SESSION_STATE_INIT;
2124 				rw_exit(&sp->session_lock);
2125 
2126 				rds_session_open(sp);
2127 
2128 			} else {
2129 				RDS_DPRINTF2("rds_sendmsg",
2130 				    "SP(%p): type changed to %d",
2131 				    sp, sp->session_type);
2132 				rw_exit(&sp->session_lock);
2133 				return (ENOMEM);
2134 			}
2135 		} else {
2136 			RDS_DPRINTF2("rds_sendmsg",
2137 			    "SP(%p): Session state %d changed",
2138 			    sp, sp->session_state);
2139 			rw_exit(&sp->session_lock);
2140 			return (ENOMEM);
2141 		}
2142 	} else {
2143 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): Session is in %d state",
2144 		    sp, sp->session_state);
2145 		rw_exit(&sp->session_lock);
2146 		return (ENOMEM);
2147 	}
2148 
2149 	rw_enter(&sp->session_lock, RW_READER);
2150 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2151 		rw_exit(&sp->session_lock);
2152 
2153 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2154 		    recvport);
2155 	} else {
2156 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): state(%d) not connected",
2157 		    sp, sp->session_state);
2158 		rw_exit(&sp->session_lock);
2159 	}
2160 
2161 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
2162 
2163 	return (ret);
2164 }
2165 
2166 /* Note: This is called on the CQ handler thread */
2167 void
2168 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
2169 {
2170 	mblk_t		*mp, *mp1;
2171 	rds_data_hdr_t	*pktp, *pktp1;
2172 	uint8_t		*datap;
2173 	rds_buf_t	*bp1;
2174 	rds_bufpool_t	*rpool;
2175 	uint_t		npkts, ix;
2176 	int		ret;
2177 
2178 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
2179 
2180 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
2181 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
2182 	npkts = pktp->dh_npkts;
2183 
2184 	/* increment rx pending here */
2185 	rpool = &ep->ep_rcvpool;
2186 	mutex_enter(&rpool->pool_lock);
2187 	rpool->pool_nbusy += npkts;
2188 	mutex_exit(&rpool->pool_lock);
2189 
2190 	/* this will get freed by sockfs */
2191 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
2192 	if (mp == NULL) {
2193 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2194 		    ep, bp);
2195 		rds_free_recv_buf(bp, npkts);
2196 		return;
2197 	}
2198 	mp->b_wptr = datap + pktp->dh_datalen;
2199 	mp->b_datap->db_type = M_DATA;
2200 
2201 	mp1 = mp;
2202 	bp1 = bp->buf_nextp;
2203 	while (bp1 != NULL) {
2204 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
2205 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
2206 		    RDS_DATA_HDR_SZ;
2207 
2208 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
2209 		    BPRI_HI, &bp1->buf_frtn);
2210 		if (mp1->b_cont == NULL) {
2211 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2212 			    ep, bp1);
2213 			freemsg(mp);
2214 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
2215 			return;
2216 		}
2217 		mp1 = mp1->b_cont;
2218 		mp1->b_wptr = datap + pktp1->dh_datalen;
2219 		mp1->b_datap->db_type = M_DATA;
2220 
2221 		bp1 = bp1->buf_nextp;
2222 	}
2223 
2224 	RDS_INCR_RXPKTS_PEND(npkts);
2225 	RDS_INCR_RXPKTS(npkts);
2226 	RDS_INCR_RXBYTES(msgdsize(mp));
2227 
2228 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
2229 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
2230 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
2231 	    npkts, pktp->dh_psn);
2232 
2233 	/* store the last buffer id, no lock needed */
2234 	if (npkts > 1) {
2235 		ep->ep_rbufid = pktp1->dh_bufid;
2236 	} else {
2237 		ep->ep_rbufid = pktp->dh_bufid;
2238 	}
2239 
2240 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
2241 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
2242 	if (ret != 0) {
2243 		if (ret == ENOSPC) {
2244 			/*
2245 			 * The message is delivered but cannot take more,
2246 			 * stop further remote messages coming to this port
2247 			 */
2248 			RDS_DPRINTF3("rds_received_msg", "Port %d NO SPACE",
2249 			    pktp->dh_recvport);
2250 			rds_stall_port(ep->ep_sp, pktp->dh_recvport, RDS_LOCAL);
2251 		} else {
2252 			RDS_DPRINTF2(LABEL, "rds_deliver_new_msg returned: %d",
2253 			    ret);
2254 		}
2255 	}
2256 
2257 	mutex_enter(&ep->ep_lock);
2258 	/* The first message can come in before the conn est event */
2259 	if ((ep->ep_rdmacnt == 0) && (ep->ep_state == RDS_EP_STATE_CONNECTED)) {
2260 		ep->ep_rdmacnt++;
2261 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
2262 		mutex_exit(&ep->ep_lock);
2263 
2264 		/* send acknowledgement */
2265 		RDS_INCR_TXACKS();
2266 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
2267 		if (ret != IBT_SUCCESS) {
2268 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send for "
2269 			    "acknowledgement failed: %d, SQ depth: %d",
2270 			    ep, ret, ep->ep_sndpool.pool_nbusy);
2271 			mutex_enter(&ep->ep_lock);
2272 			ep->ep_rdmacnt--;
2273 			mutex_exit(&ep->ep_lock);
2274 		}
2275 	} else {
2276 		/* no room to send acknowledgement */
2277 		mutex_exit(&ep->ep_lock);
2278 	}
2279 
2280 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
2281 }
2282