xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision e13e346d8734036862432c746042cff8470e8ebd)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #pragma ident	"%Z%%M%	%I%	%E% SMI"
76 
77 #include <sys/stream.h>
78 #include <sys/ib/clients/rds/rdsib_cm.h>
79 #include <sys/ib/clients/rds/rdsib_ib.h>
80 #include <sys/ib/clients/rds/rdsib_buf.h>
81 #include <sys/ib/clients/rds/rdsib_ep.h>
82 #include <sys/ib/clients/rds/rds_kstat.h>
83 #include <sys/zone.h>
84 
85 #define	RDS_POLL_CQ_IN_2TICKS	1
86 
87 /*
88  * This File contains the endpoint related calls
89  */
90 
91 extern boolean_t rds_islocal(ipaddr_t addr);
92 extern uint_t rds_wc_signal;
93 
94 #define	RDS_LOOPBACK	0
95 #define	RDS_LOCAL	1
96 #define	RDS_REMOTE	2
97 
98 #define	IBT_IPADDR	1
99 
100 static uint8_t
101 rds_is_port_marked(rds_session_t *sp, in_port_t port, uint_t qualifier)
102 {
103 	uint8_t	ret;
104 
105 	switch (qualifier) {
106 	case RDS_LOOPBACK: /* loopback */
107 		rw_enter(&rds_loopback_portmap_lock, RW_READER);
108 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
109 		rw_exit(&rds_loopback_portmap_lock);
110 		break;
111 
112 	case RDS_LOCAL: /* Session local */
113 		ASSERT(sp != NULL);
114 		rw_enter(&sp->session_local_portmap_lock, RW_READER);
115 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
116 		rw_exit(&sp->session_local_portmap_lock);
117 		break;
118 
119 	case RDS_REMOTE: /* Session remote */
120 		ASSERT(sp != NULL);
121 		rw_enter(&sp->session_remote_portmap_lock, RW_READER);
122 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
123 		rw_exit(&sp->session_remote_portmap_lock);
124 		break;
125 	}
126 
127 	return (ret);
128 }
129 
130 static uint8_t
131 rds_check_n_mark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
132 {
133 	uint8_t	ret;
134 
135 	switch (qualifier) {
136 	case RDS_LOOPBACK: /* loopback */
137 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
138 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
139 		if (!ret) {
140 			/* port is not marked, mark it */
141 			rds_loopback_portmap[port/8] =
142 			    rds_loopback_portmap[port/8] | (1 << (port % 8));
143 		}
144 		rw_exit(&rds_loopback_portmap_lock);
145 		break;
146 
147 	case RDS_LOCAL: /* Session local */
148 		ASSERT(sp != NULL);
149 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
150 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
151 		if (!ret) {
152 			/* port is not marked, mark it */
153 			sp->session_local_portmap[port/8] =
154 			    sp->session_local_portmap[port/8] |
155 			    (1 << (port % 8));
156 		}
157 		rw_exit(&sp->session_local_portmap_lock);
158 		break;
159 
160 	case RDS_REMOTE: /* Session remote */
161 		ASSERT(sp != NULL);
162 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
163 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
164 		if (!ret) {
165 			/* port is not marked, mark it */
166 			sp->session_remote_portmap[port/8] =
167 			    sp->session_remote_portmap[port/8] |
168 			    (1 << (port % 8));
169 		}
170 		rw_exit(&sp->session_remote_portmap_lock);
171 		break;
172 	}
173 
174 	return (ret);
175 }
176 
177 static uint8_t
178 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
179 {
180 	uint8_t	ret;
181 
182 	switch (qualifier) {
183 	case RDS_LOOPBACK: /* loopback */
184 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
185 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
186 		if (ret) {
187 			/* port is marked, unmark it */
188 			rds_loopback_portmap[port/8] =
189 			    rds_loopback_portmap[port/8] & ~(1 << (port % 8));
190 		}
191 		rw_exit(&rds_loopback_portmap_lock);
192 		break;
193 
194 	case RDS_LOCAL: /* Session local */
195 		ASSERT(sp != NULL);
196 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
197 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
198 		if (ret) {
199 			/* port is marked, unmark it */
200 			sp->session_local_portmap[port/8] =
201 			    sp->session_local_portmap[port/8] &
202 			    ~(1 << (port % 8));
203 		}
204 		rw_exit(&sp->session_local_portmap_lock);
205 		break;
206 
207 	case RDS_REMOTE: /* Session remote */
208 		ASSERT(sp != NULL);
209 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
210 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
211 		if (ret) {
212 			/* port is marked, unmark it */
213 			sp->session_remote_portmap[port/8] =
214 			    sp->session_remote_portmap[port/8] &
215 			    ~(1 << (port % 8));
216 		}
217 		rw_exit(&sp->session_remote_portmap_lock);
218 		break;
219 	}
220 
221 	return (ret);
222 }
223 
224 static void
225 rds_mark_all_ports(rds_session_t *sp, uint_t qualifier)
226 {
227 	switch (qualifier) {
228 	case RDS_LOOPBACK: /* loopback */
229 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
230 		(void) memset(rds_loopback_portmap, 0xFF, RDS_PORT_MAP_SIZE);
231 		rw_exit(&rds_loopback_portmap_lock);
232 		break;
233 
234 	case RDS_LOCAL: /* Session local */
235 		ASSERT(sp != NULL);
236 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
237 		(void) memset(sp->session_local_portmap, 0xFF,
238 		    RDS_PORT_MAP_SIZE);
239 		rw_exit(&sp->session_local_portmap_lock);
240 		break;
241 
242 	case RDS_REMOTE: /* Session remote */
243 		ASSERT(sp != NULL);
244 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
245 		(void) memset(sp->session_remote_portmap, 0xFF,
246 		    RDS_PORT_MAP_SIZE);
247 		rw_exit(&sp->session_remote_portmap_lock);
248 		break;
249 	}
250 }
251 
252 static void
253 rds_unmark_all_ports(rds_session_t *sp, uint_t qualifier)
254 {
255 	switch (qualifier) {
256 	case RDS_LOOPBACK: /* loopback */
257 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
258 		bzero(rds_loopback_portmap, RDS_PORT_MAP_SIZE);
259 		rw_exit(&rds_loopback_portmap_lock);
260 		break;
261 
262 	case RDS_LOCAL: /* Session local */
263 		ASSERT(sp != NULL);
264 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
265 		bzero(sp->session_local_portmap, RDS_PORT_MAP_SIZE);
266 		rw_exit(&sp->session_local_portmap_lock);
267 		break;
268 
269 	case RDS_REMOTE: /* Session remote */
270 		ASSERT(sp != NULL);
271 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
272 		bzero(sp->session_remote_portmap, RDS_PORT_MAP_SIZE);
273 		rw_exit(&sp->session_remote_portmap_lock);
274 		break;
275 	}
276 }
277 
278 static boolean_t
279 rds_add_session(rds_session_t *sp, boolean_t locked)
280 {
281 	boolean_t retval = B_TRUE;
282 
283 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
284 
285 	if (!locked) {
286 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
287 	}
288 
289 	/* Don't allow more sessions than configured in rdsib.conf */
290 	if (rdsib_statep->rds_nsessions >= (MaxNodes - 1)) {
291 		RDS_DPRINTF1("rds_add_session", "Max session limit reached");
292 		retval = B_FALSE;
293 	} else {
294 		sp->session_nextp = rdsib_statep->rds_sessionlistp;
295 		rdsib_statep->rds_sessionlistp = sp;
296 		rdsib_statep->rds_nsessions++;
297 		RDS_INCR_SESS();
298 	}
299 
300 	if (!locked) {
301 		rw_exit(&rdsib_statep->rds_sessionlock);
302 	}
303 
304 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
305 
306 	return (retval);
307 }
308 
309 /* Session lookup based on destination IP or destination node guid */
310 rds_session_t *
311 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
312 {
313 	rds_session_t	*sp;
314 
315 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
316 	    remoteip, node_guid);
317 
318 	/* A read/write lock is expected, will panic if none of them are held */
319 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
320 	sp = statep->rds_sessionlistp;
321 	while (sp) {
322 		if ((sp->session_remip == remoteip) || ((node_guid != 0) &&
323 		    (sp->session_rgid.gid_guid == node_guid))) {
324 			break;
325 		}
326 
327 		sp = sp->session_nextp;
328 	}
329 
330 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
331 
332 	return (sp);
333 }
334 
335 boolean_t
336 rds_session_lkup_by_sp(rds_session_t *sp)
337 {
338 	rds_session_t *sessionp;
339 
340 	RDS_DPRINTF4("rds_session_lkup_by_sp", "Enter: 0x%p", sp);
341 
342 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
343 	sessionp = rdsib_statep->rds_sessionlistp;
344 	while (sessionp) {
345 		if (sessionp == sp) {
346 			rw_exit(&rdsib_statep->rds_sessionlock);
347 			return (B_TRUE);
348 		}
349 
350 		sessionp = sessionp->session_nextp;
351 	}
352 	rw_exit(&rdsib_statep->rds_sessionlock);
353 
354 	return (B_FALSE);
355 }
356 
357 static void
358 rds_ep_fini(rds_ep_t *ep)
359 {
360 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
361 
362 	/* free send pool */
363 	rds_free_send_pool(ep);
364 
365 	/* free recv pool */
366 	rds_free_recv_pool(ep);
367 
368 	mutex_enter(&ep->ep_lock);
369 	ep->ep_hca_guid = 0;
370 	mutex_exit(&ep->ep_lock);
371 
372 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
373 }
374 
375 /* Assumes SP write lock is held */
376 int
377 rds_ep_init(rds_ep_t *ep, ib_guid_t hca_guid)
378 {
379 	uint_t		ret;
380 
381 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
382 
383 	/* send pool */
384 	ret = rds_init_send_pool(ep, hca_guid);
385 	if (ret != 0) {
386 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
387 		    ep, ret);
388 		return (-1);
389 	}
390 
391 	/* recv pool */
392 	ret = rds_init_recv_pool(ep);
393 	if (ret != 0) {
394 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
395 		    ep, ret);
396 		rds_free_send_pool(ep);
397 		return (-1);
398 	}
399 
400 	/* reset the ep state */
401 	mutex_enter(&ep->ep_lock);
402 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
403 	ep->ep_hca_guid = hca_guid;
404 	ep->ep_lbufid = NULL;
405 	ep->ep_rbufid = NULL;
406 	ep->ep_segfbp = NULL;
407 	ep->ep_seglbp = NULL;
408 
409 	/* Initialize the WR to send acknowledgements */
410 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
411 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
412 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
413 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
414 	ep->ep_ackwr.wr_nds = 1;
415 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
416 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
417 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
418 	mutex_exit(&ep->ep_lock);
419 
420 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
421 
422 	return (0);
423 }
424 
425 static int
426 rds_ep_reinit(rds_ep_t *ep, ib_guid_t hca_guid)
427 {
428 	int	ret;
429 
430 	RDS_DPRINTF3("rds_ep_reinit", "Enter: EP(%p) Type: %d",
431 	    ep, ep->ep_type);
432 
433 	/* Re-initialize send pool */
434 	ret = rds_reinit_send_pool(ep, hca_guid);
435 	if (ret != 0) {
436 		RDS_DPRINTF2("rds_ep_reinit",
437 		    "EP(%p): rds_reinit_send_pool failed: %d", ep, ret);
438 		return (-1);
439 	}
440 
441 	/* free all the receive buffers in the pool */
442 	rds_free_recv_pool(ep);
443 
444 	RDS_DPRINTF3("rds_ep_reinit", "Return: EP(%p) Type: %d",
445 	    ep, ep->ep_type);
446 
447 	return (0);
448 }
449 
450 void
451 rds_session_fini(rds_session_t *sp)
452 {
453 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
454 
455 	rds_ep_fini(&sp->session_dataep);
456 	rds_ep_fini(&sp->session_ctrlep);
457 
458 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
459 }
460 
461 /*
462  * Allocate and initialize the resources needed for the control and
463  * data channels
464  */
465 int
466 rds_session_init(rds_session_t *sp)
467 {
468 	int		ret;
469 	rds_hca_t	*hcap;
470 	ib_guid_t	hca_guid;
471 
472 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
473 
474 	/* CALLED WITH SESSION WRITE LOCK */
475 
476 	hcap = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
477 	if (hcap == NULL) {
478 		RDS_DPRINTF2("rds_session_init", "SGID is on an uninitialized "
479 		    "HCA: %llx", sp->session_lgid.gid_guid);
480 		return (-1);
481 	}
482 
483 	hca_guid = hcap->hca_guid;
484 
485 	/* allocate and initialize the ctrl channel */
486 	ret = rds_ep_init(&sp->session_ctrlep, hca_guid);
487 	if (ret != 0) {
488 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
489 		    "failed", sp, &sp->session_ctrlep);
490 		return (-1);
491 	}
492 
493 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
494 
495 	/* allocate and initialize the data channel */
496 	ret = rds_ep_init(&sp->session_dataep, hca_guid);
497 	if (ret != 0) {
498 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
499 		    "failed", sp, &sp->session_dataep);
500 		rds_ep_fini(&sp->session_ctrlep);
501 		return (-1);
502 	}
503 
504 	/* Clear the portmaps */
505 	rds_unmark_all_ports(sp, RDS_LOCAL);
506 	rds_unmark_all_ports(sp, RDS_REMOTE);
507 
508 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
509 
510 	RDS_DPRINTF2("rds_session_init", "Return");
511 
512 	return (0);
513 }
514 
515 /*
516  * This should be called before moving a session from ERROR state to
517  * INIT state. This will update the HCA keys incase the session has moved from
518  * one HCA to another.
519  */
520 int
521 rds_session_reinit(rds_session_t *sp, ib_gid_t lgid)
522 {
523 	rds_hca_t	*hcap, *hcap1;
524 	int		ret;
525 
526 	RDS_DPRINTF2("rds_session_reinit", "Enter: SP(0x%p)", sp);
527 
528 	/* CALLED WITH SESSION WRITE LOCK */
529 
530 	/* Clear the portmaps */
531 	rds_unmark_all_ports(sp, RDS_LOCAL);
532 	rds_unmark_all_ports(sp, RDS_REMOTE);
533 
534 	/* make the last buffer as the acknowledged */
535 	*(uintptr_t *)sp->session_dataep.ep_ack_addr =
536 	    (uintptr_t)sp->session_dataep.ep_sndpool.pool_tailp;
537 
538 	hcap = rds_gid_to_hcap(rdsib_statep, lgid);
539 	if (hcap == NULL) {
540 		RDS_DPRINTF2("rds_session_reinit", "SGID is on an "
541 		    "uninitialized HCA: %llx", lgid.gid_guid);
542 		return (-1);
543 	}
544 
545 	hcap1 = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
546 	if (hcap1 == NULL) {
547 		RDS_DPRINTF2("rds_session_reinit", "Seems like HCA %llx "
548 		    "is unplugged", sp->session_lgid.gid_guid);
549 	} else if (hcap->hca_guid == hcap1->hca_guid) {
550 		/*
551 		 * No action is needed as the session did not move across
552 		 * HCAs
553 		 */
554 		RDS_DPRINTF2("rds_session_reinit", "Failover on the same HCA");
555 		return (0);
556 	}
557 
558 	RDS_DPRINTF2("rds_session_reinit", "Failover across HCAs");
559 
560 	/* re-initialize the control channel */
561 	ret = rds_ep_reinit(&sp->session_ctrlep, hcap->hca_guid);
562 	if (ret != 0) {
563 		RDS_DPRINTF2("rds_session_reinit",
564 		    "SP(%p): Ctrl EP(%p) re-initialization failed",
565 		    sp, &sp->session_ctrlep);
566 		return (-1);
567 	}
568 
569 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Control EP(%p)",
570 	    sp, &sp->session_ctrlep);
571 
572 	/* re-initialize the data channel */
573 	ret = rds_ep_reinit(&sp->session_dataep, hcap->hca_guid);
574 	if (ret != 0) {
575 		RDS_DPRINTF2("rds_session_reinit",
576 		    "SP(%p): Data EP(%p) re-initialization failed",
577 		    sp, &sp->session_dataep);
578 		return (-1);
579 	}
580 
581 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Data EP(%p)",
582 	    sp, &sp->session_dataep);
583 
584 	sp->session_lgid = lgid;
585 
586 	RDS_DPRINTF2("rds_session_reinit", "Return: SP(0x%p)", sp);
587 
588 	return (0);
589 }
590 
591 static int
592 rds_session_connect(rds_session_t *sp)
593 {
594 	ibt_channel_hdl_t	ctrlchan, datachan;
595 	rds_ep_t		*ep;
596 	int			ret;
597 
598 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
599 
600 	sp->session_pinfo.pi_sid = rdsib_statep->rds_service_id;
601 
602 	/* Override the packet life time based on the conf file */
603 	if (IBPktLifeTime != 0) {
604 		sp->session_pinfo.pi_prim_cep_path.cep_cm_opaque1 =
605 		    IBPktLifeTime;
606 	}
607 
608 	/* Session type may change if we run into peer-to-peer case. */
609 	rw_enter(&sp->session_lock, RW_READER);
610 	if (sp->session_type == RDS_SESSION_PASSIVE) {
611 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
612 		    "active end", sp);
613 		rw_exit(&sp->session_lock);
614 		return (0); /* return success */
615 	}
616 	rw_exit(&sp->session_lock);
617 
618 	/* connect the data ep first */
619 	ep = &sp->session_dataep;
620 	mutex_enter(&ep->ep_lock);
621 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
622 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
623 		mutex_exit(&ep->ep_lock);
624 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
625 		    &datachan);
626 		if (ret != IBT_SUCCESS) {
627 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
628 			    "failed: %d", ep, ret);
629 			return (-1);
630 		}
631 		sp->session_dataep.ep_chanhdl = datachan;
632 	} else {
633 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
634 		    "unexpected state: %d", sp, ep, ep->ep_state);
635 		mutex_exit(&ep->ep_lock);
636 		return (-1);
637 	}
638 
639 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
640 	    sp, ep);
641 
642 	ep = &sp->session_ctrlep;
643 	mutex_enter(&ep->ep_lock);
644 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
645 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
646 		mutex_exit(&ep->ep_lock);
647 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
648 		    &ctrlchan);
649 		if (ret != IBT_SUCCESS) {
650 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
651 			    "failed: %d", ep, ret);
652 			return (-1);
653 		}
654 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
655 	} else {
656 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
657 		    "unexpected state: %d", sp, ep, ep->ep_state);
658 		mutex_exit(&ep->ep_lock);
659 		return (-1);
660 	}
661 
662 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
663 	    sp, sp->session_myip, sp->session_remip);
664 
665 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
666 
667 	return (0);
668 }
669 
670 /*
671  * Can be called with or without session_lock.
672  */
673 void
674 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
675 {
676 	rds_ep_t		*ep;
677 
678 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
679 	    sp->session_state);
680 
681 	ep = &sp->session_dataep;
682 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
683 
684 	/* wait until the SQ is empty before closing */
685 	if (wait != 0) {
686 		(void) rds_is_sendq_empty(ep, wait);
687 	}
688 
689 	mutex_enter(&ep->ep_lock);
690 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
691 		mutex_exit(&ep->ep_lock);
692 		delay(drv_usectohz(300000));
693 		mutex_enter(&ep->ep_lock);
694 	}
695 
696 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
697 		ep->ep_state = RDS_EP_STATE_CLOSING;
698 		mutex_exit(&ep->ep_lock);
699 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
700 		if (wait == 0) {
701 			/* make sure all WCs are flushed before proceeding */
702 			(void) rds_is_sendq_empty(ep, 1);
703 		}
704 		mutex_enter(&ep->ep_lock);
705 	}
706 	rds_ep_free_rc_channel(ep);
707 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
708 	ep->ep_segfbp = NULL;
709 	ep->ep_seglbp = NULL;
710 	mutex_exit(&ep->ep_lock);
711 
712 	ep = &sp->session_ctrlep;
713 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
714 
715 	/* wait until the SQ is empty before closing */
716 	if (wait != 0) {
717 		(void) rds_is_sendq_empty(ep, wait);
718 	}
719 
720 	mutex_enter(&ep->ep_lock);
721 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
722 		mutex_exit(&ep->ep_lock);
723 		delay(drv_usectohz(300000));
724 		mutex_enter(&ep->ep_lock);
725 	}
726 
727 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
728 		ep->ep_state = RDS_EP_STATE_CLOSING;
729 		mutex_exit(&ep->ep_lock);
730 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
731 		if (wait == 0) {
732 			/* make sure all WCs are flushed before proceeding */
733 			(void) rds_is_sendq_empty(ep, 1);
734 		}
735 		mutex_enter(&ep->ep_lock);
736 	}
737 	rds_ep_free_rc_channel(ep);
738 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
739 	ep->ep_segfbp = NULL;
740 	ep->ep_seglbp = NULL;
741 	mutex_exit(&ep->ep_lock);
742 
743 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
744 }
745 
746 /* Free the session */
747 static void
748 rds_destroy_session(rds_session_t *sp)
749 {
750 	rds_ep_t	*ep;
751 	rds_bufpool_t	*pool;
752 
753 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
754 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
755 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
756 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
757 
758 	rw_enter(&sp->session_lock, RW_READER);
759 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
760 	    sp->session_state);
761 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
762 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
763 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
764 		rw_exit(&sp->session_lock);
765 		delay(drv_usectohz(1000000));
766 		rw_enter(&sp->session_lock, RW_READER);
767 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
768 		    "ON SESSION", sp, sp->session_state);
769 	}
770 	rw_exit(&sp->session_lock);
771 
772 	/* data channel */
773 	ep = &sp->session_dataep;
774 
775 	/* send pool locks */
776 	pool = &ep->ep_sndpool;
777 	cv_destroy(&pool->pool_cv);
778 	mutex_destroy(&pool->pool_lock);
779 
780 	/* recv pool locks */
781 	pool = &ep->ep_rcvpool;
782 	cv_destroy(&pool->pool_cv);
783 	mutex_destroy(&pool->pool_lock);
784 	mutex_destroy(&ep->ep_recvqp.qp_lock);
785 
786 	/* control channel */
787 	ep = &sp->session_ctrlep;
788 
789 	/* send pool locks */
790 	pool = &ep->ep_sndpool;
791 	cv_destroy(&pool->pool_cv);
792 	mutex_destroy(&pool->pool_lock);
793 
794 	/* recv pool locks */
795 	pool = &ep->ep_rcvpool;
796 	cv_destroy(&pool->pool_cv);
797 	mutex_destroy(&pool->pool_lock);
798 	mutex_destroy(&ep->ep_recvqp.qp_lock);
799 
800 	/* session */
801 	rw_destroy(&sp->session_lock);
802 	rw_destroy(&sp->session_local_portmap_lock);
803 	rw_destroy(&sp->session_remote_portmap_lock);
804 
805 	/* free the session */
806 	kmem_free(sp, sizeof (rds_session_t));
807 
808 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
809 }
810 
811 /* This is called on the taskq thread */
812 static void
813 rds_failover_session(void *arg)
814 {
815 	rds_session_t	*sp = (rds_session_t *)arg;
816 	ib_gid_t	lgid, rgid;
817 	ipaddr_t	myip, remip;
818 	int		ret, cnt = 0;
819 
820 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
821 
822 	/* Make sure the session is still alive */
823 	if (rds_session_lkup_by_sp(sp) == B_FALSE) {
824 		RDS_DPRINTF2("rds_failover_session",
825 		    "Return: SP(%p) not ALIVE", sp);
826 		return;
827 	}
828 
829 	RDS_INCR_FAILOVERS();
830 
831 	rw_enter(&sp->session_lock, RW_WRITER);
832 	if (sp->session_type != RDS_SESSION_ACTIVE) {
833 		/*
834 		 * The remote side must have seen the error and initiated
835 		 * a re-connect.
836 		 */
837 		RDS_DPRINTF2("rds_failover_session",
838 		    "SP(%p) has become passive", sp);
839 		rw_exit(&sp->session_lock);
840 		return;
841 	}
842 	sp->session_failover = 1;
843 	rw_exit(&sp->session_lock);
844 
845 	/*
846 	 * The session is in ERROR state but close both channels
847 	 * for a clean start.
848 	 */
849 	rds_session_close(sp, IBT_BLOCKING, 1);
850 
851 	/* wait 1 sec before re-connecting */
852 	delay(drv_usectohz(1000000));
853 
854 	do {
855 		ibt_ip_path_attr_t	ipattr;
856 		ibt_ip_addr_t		dstip;
857 
858 		/* The ipaddr should be in the network order */
859 		myip = sp->session_myip;
860 		remip = sp->session_remip;
861 		ret = rds_sc_path_lookup(&myip, &remip);
862 		if (ret == 0) {
863 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
864 			    myip, remip);
865 		}
866 		/* check if we have (new) path from the source to destination */
867 		lgid.gid_prefix = 0;
868 		lgid.gid_guid = 0;
869 		rgid.gid_prefix = 0;
870 		rgid.gid_guid = 0;
871 
872 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
873 		dstip.family = AF_INET;
874 		dstip.un.ip4addr = htonl(remip);
875 		ipattr.ipa_dst_ip = &dstip;
876 		ipattr.ipa_src_ip.family = AF_INET;
877 		ipattr.ipa_src_ip.un.ip4addr = htonl(myip);
878 		ipattr.ipa_ndst = 1;
879 		ipattr.ipa_max_paths = 1;
880 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
881 		    myip, remip);
882 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
883 		    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo, NULL, NULL);
884 		if (ret == IBT_SUCCESS) {
885 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
886 			lgid = sp->session_pinfo.
887 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
888 			rgid = sp->session_pinfo.
889 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
890 			break;
891 		}
892 
893 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths failed, ret: %d ", ret);
894 
895 		/* wait 1 sec before re-trying */
896 		delay(drv_usectohz(1000000));
897 		cnt++;
898 	} while (cnt < 5);
899 
900 	if (ret != IBT_SUCCESS) {
901 		rw_enter(&sp->session_lock, RW_WRITER);
902 		if (sp->session_type == RDS_SESSION_ACTIVE) {
903 			rds_session_fini(sp);
904 			sp->session_state = RDS_SESSION_STATE_FAILED;
905 			sp->session_failover = 0;
906 			RDS_DPRINTF3("rds_failover_session",
907 			    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
908 		} else {
909 			RDS_DPRINTF2("rds_failover_session",
910 			    "SP(%p) has become passive", sp);
911 		}
912 		rw_exit(&sp->session_lock);
913 		return;
914 	}
915 
916 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
917 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
918 	    rgid.gid_guid);
919 
920 	rw_enter(&sp->session_lock, RW_WRITER);
921 	if (sp->session_type != RDS_SESSION_ACTIVE) {
922 		/*
923 		 * The remote side must have seen the error and initiated
924 		 * a re-connect.
925 		 */
926 		RDS_DPRINTF2("rds_failover_session",
927 		    "SP(%p) has become passive", sp);
928 		rw_exit(&sp->session_lock);
929 		return;
930 	}
931 
932 	/* move the session to init state */
933 	ret = rds_session_reinit(sp, lgid);
934 	sp->session_lgid = lgid;
935 	sp->session_rgid = rgid;
936 	if (ret != 0) {
937 		rds_session_fini(sp);
938 		sp->session_state = RDS_SESSION_STATE_FAILED;
939 		sp->session_failover = 0;
940 		RDS_DPRINTF3("rds_failover_session",
941 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
942 		rw_exit(&sp->session_lock);
943 		return;
944 	} else {
945 		sp->session_state = RDS_SESSION_STATE_INIT;
946 		RDS_DPRINTF3("rds_failover_session",
947 		    "SP(%p) State RDS_SESSION_STATE_INIT", sp);
948 	}
949 	rw_exit(&sp->session_lock);
950 
951 	rds_session_open(sp);
952 
953 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
954 }
955 
956 void
957 rds_handle_send_error(rds_ep_t *ep)
958 {
959 	if (rds_is_sendq_empty(ep, 0)) {
960 		/* Session should already be in ERROR, try to reconnect */
961 		RDS_DPRINTF2("rds_handle_send_error",
962 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
963 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
964 		    (void *)ep->ep_sp, DDI_SLEEP);
965 	}
966 }
967 
968 /*
969  * Called in the CM handler on the passive side
970  * Called on a taskq thread.
971  */
972 void
973 rds_cleanup_passive_session(void *arg)
974 {
975 	rds_session_t	*sp = arg;
976 
977 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
978 	    sp->session_state);
979 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
980 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
981 
982 	rds_session_close(sp, IBT_BLOCKING, 1);
983 
984 	rw_enter(&sp->session_lock, RW_WRITER);
985 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
986 		rds_session_fini(sp);
987 		sp->session_state = RDS_SESSION_STATE_FINI;
988 		sp->session_failover = 0;
989 		RDS_DPRINTF3("rds_cleanup_passive_session",
990 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
991 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
992 		rds_session_fini(sp);
993 		sp->session_state = RDS_SESSION_STATE_FAILED;
994 		sp->session_failover = 0;
995 		RDS_DPRINTF3("rds_cleanup_passive_session",
996 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
997 	}
998 	rw_exit(&sp->session_lock);
999 
1000 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
1001 }
1002 
1003 /*
1004  * Called by the CM handler on the passive side
1005  * Called with WRITE lock on the session
1006  */
1007 void
1008 rds_passive_session_fini(rds_session_t *sp)
1009 {
1010 	rds_ep_t	*ep;
1011 
1012 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
1013 	    sp->session_state);
1014 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
1015 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
1016 
1017 	/* clean the data channel */
1018 	ep = &sp->session_dataep;
1019 	(void) rds_is_sendq_empty(ep, 1);
1020 	mutex_enter(&ep->ep_lock);
1021 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1022 	    ep->ep_state);
1023 	rds_ep_free_rc_channel(ep);
1024 	mutex_exit(&ep->ep_lock);
1025 
1026 	/* clean the control channel */
1027 	ep = &sp->session_ctrlep;
1028 	(void) rds_is_sendq_empty(ep, 1);
1029 	mutex_enter(&ep->ep_lock);
1030 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1031 	    ep->ep_state);
1032 	rds_ep_free_rc_channel(ep);
1033 	mutex_exit(&ep->ep_lock);
1034 
1035 	rds_session_fini(sp);
1036 	sp->session_failover = 0;
1037 
1038 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
1039 }
1040 
1041 /*
1042  * Can be called:
1043  * 1. on driver detach
1044  * 2. on taskq thread
1045  * arg is always NULL
1046  */
1047 /* ARGSUSED */
1048 void
1049 rds_close_sessions(void *arg)
1050 {
1051 	rds_session_t *sp, *spnextp;
1052 
1053 	RDS_DPRINTF2("rds_close_sessions", "Enter");
1054 
1055 	/* wait until all the buffers are freed by the sockets */
1056 	while (RDS_GET_RXPKTS_PEND() != 0) {
1057 		/* wait one second and try again */
1058 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
1059 		    "pending packets", RDS_GET_RXPKTS_PEND());
1060 		delay(drv_usectohz(1000000));
1061 	}
1062 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
1063 
1064 	/* close all the sessions */
1065 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
1066 	sp = rdsib_statep->rds_sessionlistp;
1067 	while (sp) {
1068 		rw_enter(&sp->session_lock, RW_WRITER);
1069 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
1070 		    sp->session_state);
1071 
1072 		switch (sp->session_state) {
1073 		case RDS_SESSION_STATE_CONNECTED:
1074 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1075 			rw_exit(&sp->session_lock);
1076 
1077 			rds_session_close(sp, IBT_BLOCKING, 1);
1078 
1079 			rw_enter(&sp->session_lock, RW_WRITER);
1080 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1081 			RDS_DPRINTF3("rds_close_sessions",
1082 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1083 			rds_session_fini(sp);
1084 			sp->session_state = RDS_SESSION_STATE_FINI;
1085 			sp->session_failover = 0;
1086 			RDS_DPRINTF3("rds_close_sessions",
1087 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1088 			break;
1089 
1090 		case RDS_SESSION_STATE_ERROR:
1091 		case RDS_SESSION_STATE_PASSIVE_CLOSING:
1092 		case RDS_SESSION_STATE_INIT:
1093 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1094 			rw_exit(&sp->session_lock);
1095 
1096 			rds_session_close(sp, IBT_BLOCKING, 1);
1097 
1098 			rw_enter(&sp->session_lock, RW_WRITER);
1099 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1100 			RDS_DPRINTF3("rds_close_sessions",
1101 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1102 			/* FALLTHRU */
1103 		case RDS_SESSION_STATE_CLOSED:
1104 			rds_session_fini(sp);
1105 			sp->session_state = RDS_SESSION_STATE_FINI;
1106 			sp->session_failover = 0;
1107 			RDS_DPRINTF3("rds_close_sessions",
1108 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1109 			break;
1110 		}
1111 
1112 		rw_exit(&sp->session_lock);
1113 		sp = sp->session_nextp;
1114 	}
1115 
1116 	sp = rdsib_statep->rds_sessionlistp;
1117 	rdsib_statep->rds_sessionlistp = NULL;
1118 	rdsib_statep->rds_nsessions = 0;
1119 	rw_exit(&rdsib_statep->rds_sessionlock);
1120 
1121 	while (sp) {
1122 		spnextp = sp->session_nextp;
1123 		rds_destroy_session(sp);
1124 		RDS_DECR_SESS();
1125 		sp = spnextp;
1126 	}
1127 
1128 	/* free the global pool */
1129 	rds_free_recv_caches(rdsib_statep);
1130 
1131 	RDS_DPRINTF2("rds_close_sessions", "Return");
1132 }
1133 
1134 void
1135 rds_session_open(rds_session_t *sp)
1136 {
1137 	int		ret;
1138 
1139 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
1140 
1141 	ret = rds_session_connect(sp);
1142 	if (ret == -1) {
1143 		/*
1144 		 * may be the session has become passive due to
1145 		 * hitting peer-to-peer case
1146 		 */
1147 		rw_enter(&sp->session_lock, RW_READER);
1148 		if (sp->session_type == RDS_SESSION_PASSIVE) {
1149 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
1150 			    "has become passive from active", sp);
1151 			rw_exit(&sp->session_lock);
1152 			return;
1153 		}
1154 
1155 		/* get the lock for writing */
1156 		rw_exit(&sp->session_lock);
1157 		rw_enter(&sp->session_lock, RW_WRITER);
1158 		sp->session_state = RDS_SESSION_STATE_ERROR;
1159 		RDS_DPRINTF3("rds_session_open",
1160 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
1161 		rw_exit(&sp->session_lock);
1162 
1163 		/* Connect request failed */
1164 		rds_session_close(sp, IBT_BLOCKING, 1);
1165 
1166 		rw_enter(&sp->session_lock, RW_WRITER);
1167 		rds_session_fini(sp);
1168 		sp->session_state = RDS_SESSION_STATE_FAILED;
1169 		sp->session_failover = 0;
1170 		RDS_DPRINTF3("rds_session_open",
1171 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1172 		rw_exit(&sp->session_lock);
1173 
1174 		return;
1175 	}
1176 
1177 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
1178 }
1179 
1180 /*
1181  * Creates a session and inserts it into the list of sessions. The session
1182  * state would be CREATED.
1183  * Return Values:
1184  *	EWOULDBLOCK
1185  */
1186 rds_session_t *
1187 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
1188     ibt_cm_req_rcv_t *reqp, uint8_t type)
1189 {
1190 	ib_gid_t	lgid, rgid;
1191 	rds_session_t	*newp, *oldp;
1192 	rds_ep_t	*dataep, *ctrlep;
1193 	rds_bufpool_t	*pool;
1194 	int		ret;
1195 
1196 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x, type: %d",
1197 	    statep, localip, remip, type);
1198 
1199 	/* Check if there is space for a new session */
1200 	rw_enter(&statep->rds_sessionlock, RW_READER);
1201 	if (statep->rds_nsessions >= (MaxNodes - 1)) {
1202 		rw_exit(&statep->rds_sessionlock);
1203 		RDS_DPRINTF1("rds_session_create", "No More Sessions allowed");
1204 		return (NULL);
1205 	}
1206 	rw_exit(&statep->rds_sessionlock);
1207 
1208 	/* Allocate and initialize global buffer pool */
1209 	ret = rds_init_recv_caches(statep);
1210 	if (ret != 0) {
1211 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
1212 		return (NULL);
1213 	}
1214 
1215 	/* enough memory for session (includes 2 endpoints) */
1216 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
1217 
1218 	newp->session_remip = remip;
1219 	newp->session_myip = localip;
1220 	newp->session_type = type;
1221 	newp->session_state = RDS_SESSION_STATE_CREATED;
1222 	RDS_DPRINTF3("rds_session_create",
1223 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
1224 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
1225 	rw_init(&newp->session_local_portmap_lock, NULL, RW_DRIVER, NULL);
1226 	rw_init(&newp->session_remote_portmap_lock, NULL, RW_DRIVER, NULL);
1227 
1228 	/* Initialize data endpoint */
1229 	dataep = &newp->session_dataep;
1230 	dataep->ep_remip = newp->session_remip;
1231 	dataep->ep_myip = newp->session_myip;
1232 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
1233 	dataep->ep_sp = newp;
1234 	dataep->ep_type = RDS_EP_TYPE_DATA;
1235 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1236 
1237 	/* Initialize send pool locks */
1238 	pool = &dataep->ep_sndpool;
1239 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1240 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1241 
1242 	/* Initialize recv pool locks */
1243 	pool = &dataep->ep_rcvpool;
1244 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1245 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1246 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1247 
1248 	/* Initialize control endpoint */
1249 	ctrlep = &newp->session_ctrlep;
1250 	ctrlep->ep_remip = newp->session_remip;
1251 	ctrlep->ep_myip = newp->session_myip;
1252 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
1253 	ctrlep->ep_sp = newp;
1254 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
1255 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1256 
1257 	/* Initialize send pool locks */
1258 	pool = &ctrlep->ep_sndpool;
1259 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1260 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1261 
1262 	/* Initialize recv pool locks */
1263 	pool = &ctrlep->ep_rcvpool;
1264 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1265 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1266 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1267 
1268 	/* lkup if there is already a session */
1269 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
1270 	oldp = rds_session_lkup(statep, remip, 0);
1271 	if (oldp != NULL) {
1272 		/* A session to this destination exists */
1273 		rw_exit(&statep->rds_sessionlock);
1274 		rw_destroy(&newp->session_lock);
1275 		rw_destroy(&newp->session_local_portmap_lock);
1276 		rw_destroy(&newp->session_remote_portmap_lock);
1277 		mutex_destroy(&dataep->ep_lock);
1278 		mutex_destroy(&ctrlep->ep_lock);
1279 		kmem_free(newp, sizeof (rds_session_t));
1280 		return (NULL);
1281 	}
1282 
1283 	/* Insert this session into the list */
1284 	if (rds_add_session(newp, B_TRUE) != B_TRUE) {
1285 		/* No room to add this session */
1286 		rw_exit(&statep->rds_sessionlock);
1287 		rw_destroy(&newp->session_lock);
1288 		rw_destroy(&newp->session_local_portmap_lock);
1289 		rw_destroy(&newp->session_remote_portmap_lock);
1290 		mutex_destroy(&dataep->ep_lock);
1291 		mutex_destroy(&ctrlep->ep_lock);
1292 		kmem_free(newp, sizeof (rds_session_t));
1293 		return (NULL);
1294 	}
1295 
1296 	/* unlock the session list */
1297 	rw_exit(&statep->rds_sessionlock);
1298 
1299 	if (type == RDS_SESSION_ACTIVE) {
1300 		ipaddr_t		localip1, remip1;
1301 		ibt_ip_path_attr_t	ipattr;
1302 		ibt_ip_addr_t		dstip;
1303 
1304 		/* The ipaddr should be in the network order */
1305 		localip1 = localip;
1306 		remip1 = remip;
1307 		ret = rds_sc_path_lookup(&localip1, &remip1);
1308 		if (ret == 0) {
1309 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1310 			    localip, remip);
1311 		}
1312 
1313 		/* Get the gids for the source and destination ip addrs */
1314 		lgid.gid_prefix = 0;
1315 		lgid.gid_guid = 0;
1316 		rgid.gid_prefix = 0;
1317 		rgid.gid_guid = 0;
1318 
1319 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1320 		dstip.family = AF_INET;
1321 		dstip.un.ip4addr = ntohl(remip1);
1322 		ipattr.ipa_dst_ip = &dstip;
1323 		ipattr.ipa_src_ip.family = AF_INET;
1324 		ipattr.ipa_src_ip.un.ip4addr = ntohl(localip1);
1325 		ipattr.ipa_ndst = 1;
1326 		ipattr.ipa_max_paths = 1;
1327 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
1328 		    localip1, remip1);
1329 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
1330 		    IBT_PATH_NO_FLAGS, &ipattr, &newp->session_pinfo,
1331 		    NULL, NULL);
1332 		if (ret != IBT_SUCCESS) {
1333 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths failed, ret: %d "
1334 			    "lgid: %llx:%llx rgid: %llx:%llx", lgid.gid_prefix,
1335 			    lgid.gid_guid, rgid.gid_prefix, rgid.gid_guid);
1336 
1337 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1338 			return (NULL);
1339 		}
1340 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
1341 		lgid =
1342 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_sgid;
1343 		rgid =
1344 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_dgid;
1345 
1346 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1347 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1348 		    rgid.gid_guid);
1349 	}
1350 
1351 	rw_enter(&newp->session_lock, RW_WRITER);
1352 	/* check for peer-to-peer case */
1353 	if (type == newp->session_type) {
1354 		/* no peer-to-peer case */
1355 		if (type == RDS_SESSION_ACTIVE) {
1356 			newp->session_lgid = lgid;
1357 			newp->session_rgid = rgid;
1358 		} else {
1359 			/* rgid is requester gid & lgid is receiver gid */
1360 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1361 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1362 		}
1363 	}
1364 	rw_exit(&newp->session_lock);
1365 
1366 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1367 
1368 	return (newp);
1369 }
1370 
1371 void
1372 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1373 {
1374 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1375 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1376 
1377 	switch (cpkt->rcp_code) {
1378 	case RDS_CTRL_CODE_STALL:
1379 		RDS_INCR_STALLS_RCVD();
1380 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1381 		break;
1382 	case RDS_CTRL_CODE_UNSTALL:
1383 		RDS_INCR_UNSTALLS_RCVD();
1384 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1385 		break;
1386 	case RDS_CTRL_CODE_STALL_PORTS:
1387 		rds_mark_all_ports(sp, RDS_REMOTE);
1388 		break;
1389 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1390 		rds_unmark_all_ports(sp, RDS_REMOTE);
1391 		break;
1392 	case RDS_CTRL_CODE_HEARTBEAT:
1393 		break;
1394 	default:
1395 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1396 		    cpkt->rcp_code);
1397 		break;
1398 	}
1399 
1400 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1401 }
1402 
1403 int
1404 rds_post_control_message(rds_session_t *sp, uint8_t code, in_port_t port)
1405 {
1406 	ibt_send_wr_t	wr;
1407 	rds_ep_t	*ep;
1408 	rds_buf_t	*bp;
1409 	rds_ctrl_pkt_t	*cp;
1410 	int		ret;
1411 
1412 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1413 	    "Port: %d", sp, code, port);
1414 
1415 	ep = &sp->session_ctrlep;
1416 
1417 	bp = rds_get_send_buf(ep, 1);
1418 	if (bp == NULL) {
1419 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1420 		    "message: SP(%p) Code: %d Port: %d", sp, code,
1421 		    port);
1422 		return (-1);
1423 	}
1424 
1425 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1426 	cp->rcp_code = code;
1427 	cp->rcp_port = port;
1428 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1429 
1430 	wr.wr_id = (uintptr_t)bp;
1431 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1432 	wr.wr_trans = IBT_RC_SRV;
1433 	wr.wr_opcode = IBT_WRC_SEND;
1434 	wr.wr_nds = 1;
1435 	wr.wr_sgl = &bp->buf_ds;
1436 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1437 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1438 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1439 	if (ret != IBT_SUCCESS) {
1440 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1441 		    "%d", ep, ret);
1442 		bp->buf_state = RDS_SNDBUF_FREE;
1443 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1444 		return (-1);
1445 	}
1446 
1447 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1448 	    "Port: %d", sp, code, port);
1449 
1450 	return (0);
1451 }
1452 
1453 void
1454 rds_stall_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
1455 {
1456 	int		ret;
1457 
1458 	RDS_DPRINTF4("rds_stall_port", "Enter: SP(%p) Port %d", sp, port);
1459 
1460 	RDS_INCR_STALLS_TRIGGERED();
1461 
1462 	if (!rds_check_n_mark_port(sp, port, qualifier)) {
1463 
1464 		if (sp != NULL) {
1465 			ret = rds_post_control_message(sp,
1466 			    RDS_CTRL_CODE_STALL, port);
1467 			if (ret != 0) {
1468 				(void) rds_check_n_unmark_port(sp, port,
1469 				    qualifier);
1470 				return;
1471 			}
1472 			RDS_INCR_STALLS_SENT();
1473 		}
1474 	} else {
1475 		RDS_DPRINTF3(LABEL,
1476 		    "Port %d is already in stall state", port);
1477 	}
1478 
1479 	RDS_DPRINTF4("rds_stall_port", "Return: SP(%p) Port %d", sp, port);
1480 }
1481 
1482 void
1483 rds_resume_port(in_port_t port)
1484 {
1485 	rds_session_t	*sp;
1486 	uint_t		ix;
1487 	int		ret;
1488 
1489 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1490 
1491 	RDS_INCR_UNSTALLS_TRIGGERED();
1492 
1493 	/* resume loopback traffic */
1494 	(void) rds_check_n_unmark_port(NULL, port, RDS_LOOPBACK);
1495 
1496 	/* send unstall messages to resume the remote traffic */
1497 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1498 
1499 	sp = rdsib_statep->rds_sessionlistp;
1500 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1501 		ASSERT(sp != NULL);
1502 		if ((sp->session_state == RDS_SESSION_STATE_CONNECTED) &&
1503 		    (rds_check_n_unmark_port(sp, port, RDS_LOCAL))) {
1504 				ret = rds_post_control_message(sp,
1505 				    RDS_CTRL_CODE_UNSTALL, port);
1506 				if (ret != 0) {
1507 					(void) rds_check_n_mark_port(sp, port,
1508 					    RDS_LOCAL);
1509 				} else {
1510 					RDS_INCR_UNSTALLS_SENT();
1511 				}
1512 		}
1513 
1514 		sp = sp->session_nextp;
1515 	}
1516 
1517 	rw_exit(&rdsib_statep->rds_sessionlock);
1518 
1519 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1520 }
1521 
1522 static int
1523 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1524     in_port_t recvport)
1525 {
1526 	ibt_send_wr_t	*wrp, wr;
1527 	rds_buf_t	*bp, *bp1;
1528 	rds_data_hdr_t	*pktp;
1529 	uint32_t	msgsize, npkts, residual, pktno, ix;
1530 	int		ret;
1531 
1532 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1533 	    ep, uiop);
1534 
1535 	/* how many pkts are needed to carry this msg */
1536 	msgsize = uiop->uio_resid;
1537 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1538 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1539 
1540 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1541 	    msgsize, npkts);
1542 
1543 	/* Get the buffers needed to post this message */
1544 	bp = rds_get_send_buf(ep, npkts);
1545 	if (bp == NULL) {
1546 		RDS_INCR_ENOBUFS();
1547 		return (ENOBUFS);
1548 	}
1549 
1550 	if (npkts > 1) {
1551 		/*
1552 		 * multi-pkt messages are posted at the same time as a list
1553 		 * of WRs
1554 		 */
1555 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1556 		    npkts, KM_SLEEP);
1557 	}
1558 
1559 
1560 	pktno = 0;
1561 	bp1 = bp;
1562 	do {
1563 		/* prepare the header */
1564 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1565 		pktp->dh_datalen = UserBufferSize;
1566 		pktp->dh_npkts = npkts - pktno;
1567 		pktp->dh_psn = pktno;
1568 		pktp->dh_sendport = sendport;
1569 		pktp->dh_recvport = recvport;
1570 		bp1->buf_ds.ds_len = RdsPktSize;
1571 
1572 		/* copy the data */
1573 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1574 		    UserBufferSize, UIO_WRITE, uiop);
1575 		if (ret != 0) {
1576 			break;
1577 		}
1578 
1579 		if (uiop->uio_resid == 0) {
1580 			pktp->dh_datalen = residual;
1581 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1582 			break;
1583 		}
1584 		pktno++;
1585 		bp1 = bp1->buf_nextp;
1586 	} while (uiop->uio_resid);
1587 
1588 	if (ret) {
1589 		/* uiomove failed */
1590 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1591 		    uiop, ret);
1592 		if (npkts > 1) {
1593 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1594 		}
1595 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1596 		return (ret);
1597 	}
1598 
1599 	if (npkts > 1) {
1600 		/* multi-pkt message */
1601 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1602 
1603 		bp1 = bp;
1604 		for (ix = 0; ix < npkts; ix++) {
1605 			wrp[ix].wr_id = (uintptr_t)bp1;
1606 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1607 			wrp[ix].wr_trans = IBT_RC_SRV;
1608 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1609 			wrp[ix].wr_nds = 1;
1610 			wrp[ix].wr_sgl = &bp1->buf_ds;
1611 			bp1 = bp1->buf_nextp;
1612 		}
1613 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1614 
1615 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1616 		if (ret != IBT_SUCCESS) {
1617 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1618 			    "%d for %d pkts", ep, ret, npkts);
1619 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1620 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1621 			return (ret);
1622 		}
1623 
1624 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1625 	} else {
1626 		/* single pkt */
1627 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1628 		wr.wr_id = (uintptr_t)bp;
1629 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1630 		wr.wr_trans = IBT_RC_SRV;
1631 		wr.wr_opcode = IBT_WRC_SEND;
1632 		wr.wr_nds = 1;
1633 		wr.wr_sgl = &bp->buf_ds;
1634 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1635 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1636 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1637 		if (ret != IBT_SUCCESS) {
1638 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1639 			    "%d", ep, ret);
1640 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1641 			return (ret);
1642 		}
1643 	}
1644 
1645 	RDS_INCR_TXPKTS(npkts);
1646 	RDS_INCR_TXBYTES(msgsize);
1647 
1648 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1649 	    ep, uiop);
1650 
1651 	return (0);
1652 }
1653 
1654 static int
1655 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1656     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1657 {
1658 	mblk_t		*mp;
1659 	int		ret;
1660 
1661 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1662 
1663 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1664 	    "%d to recvport: %d", sendport, recvport);
1665 
1666 	mp = allocb(uiop->uio_resid, BPRI_MED);
1667 	if (mp == NULL) {
1668 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1669 		    uiop->uio_resid);
1670 		return (ENOSPC);
1671 	}
1672 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1673 
1674 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1675 	if (ret) {
1676 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1677 		freeb(mp);
1678 		return (ret);
1679 	}
1680 
1681 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1682 	    zoneid);
1683 	if (ret != 0) {
1684 		if (ret == ENOSPC) {
1685 			/*
1686 			 * The message is delivered but cannot take more,
1687 			 * stop further loopback traffic to this port
1688 			 */
1689 			RDS_DPRINTF3("rds_deliver_loopback_msg",
1690 			    "Port %d NO SPACE", recvport);
1691 			rds_stall_port(NULL, recvport, RDS_LOOPBACK);
1692 		} else {
1693 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1694 			    "port %d failed: %d", sendport, recvport, ret);
1695 			return (ret);
1696 		}
1697 	}
1698 
1699 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1700 	return (0);
1701 }
1702 
1703 static void
1704 rds_resend_messages(void *arg)
1705 {
1706 	rds_session_t	*sp = (rds_session_t *)arg;
1707 	rds_ep_t	*ep;
1708 	rds_bufpool_t	*spool;
1709 	rds_buf_t	*bp, *endp, *tmp;
1710 	ibt_send_wr_t	*wrp;
1711 	uint_t		nwr = 0, ix, jx;
1712 	int		ret;
1713 
1714 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1715 
1716 	ep = &sp->session_dataep;
1717 
1718 	spool = &ep->ep_sndpool;
1719 	mutex_enter(&spool->pool_lock);
1720 
1721 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1722 
1723 	if (ep->ep_lbufid == NULL) {
1724 		RDS_DPRINTF2("rds_resend_messages",
1725 		    "SP(%p) Remote session is cleaned up ", sp);
1726 		/*
1727 		 * The remote end cleaned up its session. There may be loss
1728 		 * of messages. Mark all buffers as acknowledged.
1729 		 */
1730 		tmp = spool->pool_tailp;
1731 	} else {
1732 		tmp = (rds_buf_t *)ep->ep_lbufid;
1733 		RDS_DPRINTF2("rds_resend_messages",
1734 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1735 	}
1736 
1737 	endp = spool->pool_tailp;
1738 	bp = spool->pool_headp;
1739 	jx = 0;
1740 	while ((bp != NULL) && (bp != tmp)) {
1741 		bp->buf_state = RDS_SNDBUF_FREE;
1742 		jx++;
1743 		bp = bp->buf_nextp;
1744 	}
1745 
1746 	if (bp == NULL) {
1747 		mutex_exit(&spool->pool_lock);
1748 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1749 		    "found in the list", tmp);
1750 
1751 		rw_enter(&sp->session_lock, RW_WRITER);
1752 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1753 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1754 		} else {
1755 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1756 			    "Expected State: %d", sp, sp->session_state,
1757 			    RDS_SESSION_STATE_CONNECTED);
1758 		}
1759 		sp->session_failover = 0;
1760 		rw_exit(&sp->session_lock);
1761 		return;
1762 	}
1763 
1764 	/* Found the match */
1765 	bp->buf_state = RDS_SNDBUF_FREE;
1766 	jx++;
1767 
1768 	spool->pool_tailp = bp;
1769 	bp = bp->buf_nextp;
1770 	spool->pool_tailp->buf_nextp = NULL;
1771 	nwr = spool->pool_nfree - jx;
1772 	spool->pool_nfree = jx;
1773 	mutex_exit(&spool->pool_lock);
1774 
1775 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1776 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1777 
1778 	if (bp) {
1779 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1780 		    KM_SLEEP);
1781 
1782 		while (nwr) {
1783 			jx = (nwr > 100) ? 100 : nwr;
1784 
1785 			tmp = bp;
1786 			for (ix = 0; ix < jx; ix++) {
1787 				bp->buf_state = RDS_SNDBUF_PENDING;
1788 				wrp[ix].wr_id = (uintptr_t)bp;
1789 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1790 				wrp[ix].wr_trans = IBT_RC_SRV;
1791 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1792 				wrp[ix].wr_nds = 1;
1793 				wrp[ix].wr_sgl = &bp->buf_ds;
1794 				bp = bp->buf_nextp;
1795 			}
1796 
1797 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1798 			if (ret != IBT_SUCCESS) {
1799 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1800 				    "failed: %d for % pkts", ep, ret, jx);
1801 				break;
1802 			}
1803 
1804 			mutex_enter(&spool->pool_lock);
1805 			spool->pool_nbusy += jx;
1806 			mutex_exit(&spool->pool_lock);
1807 
1808 			nwr -= jx;
1809 		}
1810 
1811 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1812 
1813 		if (nwr != 0) {
1814 
1815 			/*
1816 			 * An error while failover is in progress. Some WRs are
1817 			 * posted while other remain. If any of the posted WRs
1818 			 * complete in error then they would dispatch a taskq to
1819 			 * do a failover. Getting the session lock will prevent
1820 			 * the taskq to wait until we are done here.
1821 			 */
1822 			rw_enter(&sp->session_lock, RW_READER);
1823 
1824 			/*
1825 			 * Wait until all the previous WRs are completed and
1826 			 * then queue the remaining, otherwise the order of
1827 			 * the messages may change.
1828 			 */
1829 			(void) rds_is_sendq_empty(ep, 1);
1830 
1831 			/* free the remaining buffers */
1832 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1833 
1834 			rw_exit(&sp->session_lock);
1835 			return;
1836 		}
1837 	}
1838 
1839 	rw_enter(&sp->session_lock, RW_WRITER);
1840 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1841 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1842 	} else {
1843 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1844 		    "Expected State: %d", sp, sp->session_state,
1845 		    RDS_SESSION_STATE_CONNECTED);
1846 	}
1847 	sp->session_failover = 0;
1848 	rw_exit(&sp->session_lock);
1849 
1850 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1851 }
1852 
1853 /*
1854  * This is called when a channel is connected. Transition the session to
1855  * CONNECTED state iff both channels are connected.
1856  */
1857 void
1858 rds_session_active(rds_session_t *sp)
1859 {
1860 	rds_ep_t	*ep;
1861 	uint_t		failover;
1862 
1863 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1864 
1865 	rw_enter(&sp->session_lock, RW_READER);
1866 
1867 	failover = sp->session_failover;
1868 
1869 	/*
1870 	 * we establish the data channel first, so check the control channel
1871 	 * first but make sure it is initialized.
1872 	 */
1873 	ep = &sp->session_ctrlep;
1874 	mutex_enter(&ep->ep_lock);
1875 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1876 		/* the session is not ready yet */
1877 		mutex_exit(&ep->ep_lock);
1878 		rw_exit(&sp->session_lock);
1879 		return;
1880 	}
1881 	mutex_exit(&ep->ep_lock);
1882 
1883 	/* control channel is connected, check the data channel */
1884 	ep = &sp->session_dataep;
1885 	mutex_enter(&ep->ep_lock);
1886 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1887 		/* data channel is not yet connected */
1888 		mutex_exit(&ep->ep_lock);
1889 		rw_exit(&sp->session_lock);
1890 		return;
1891 	}
1892 	mutex_exit(&ep->ep_lock);
1893 
1894 	if (failover) {
1895 		rw_exit(&sp->session_lock);
1896 
1897 		/*
1898 		 * The session has failed over. Previous msgs have to be
1899 		 * re-sent before the session is moved to the connected
1900 		 * state.
1901 		 */
1902 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1903 		    "to re-send messages", sp);
1904 		(void) ddi_taskq_dispatch(rds_taskq,
1905 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1906 		return;
1907 	}
1908 
1909 	/* the session is ready */
1910 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1911 	RDS_DPRINTF3("rds_session_active",
1912 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1913 
1914 	rw_exit(&sp->session_lock);
1915 
1916 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1917 }
1918 
1919 static int
1920 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1921     in_port_t recvport)
1922 {
1923 	int	ret;
1924 
1925 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1926 	    "%d", ep, sendport, recvport);
1927 
1928 	/* make sure the remote port is not stalled */
1929 	if (rds_is_port_marked(ep->ep_sp, recvport, RDS_REMOTE)) {
1930 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1931 		    ep->ep_sp, recvport);
1932 		RDS_INCR_EWOULDBLOCK();
1933 		ret = ENOMEM;
1934 	} else {
1935 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1936 	}
1937 
1938 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1939 
1940 	return (ret);
1941 }
1942 
1943 /* Send a message to a destination socket */
1944 int
1945 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1946     in_port_t recvport, zoneid_t zoneid)
1947 {
1948 	rds_session_t	*sp;
1949 	ib_gid_t	lgid, rgid;
1950 	int		ret;
1951 
1952 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1953 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1954 	    sendport, recvport);
1955 
1956 	/* If msg length is 0, just return success */
1957 	if (uiop->uio_resid == 0) {
1958 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
1959 		return (0);
1960 	}
1961 
1962 	/* Is there a session to the destination? */
1963 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1964 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
1965 	rw_exit(&rdsib_statep->rds_sessionlock);
1966 
1967 	/* Is this a loopback message? */
1968 	if ((sp == NULL) && (rds_islocal(recvip))) {
1969 		/* make sure the port is not stalled */
1970 		if (rds_is_port_marked(NULL, recvport, RDS_LOOPBACK)) {
1971 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
1972 			    recvport);
1973 			RDS_INCR_EWOULDBLOCK();
1974 			return (ENOMEM);
1975 		}
1976 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
1977 		    sendport, zoneid);
1978 		return (ret);
1979 	}
1980 
1981 	/* Not a loopback message */
1982 	if (sp == NULL) {
1983 		/* There is no session to the destination, create one. */
1984 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
1985 		    "IP: 0x%x", recvip);
1986 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
1987 		    RDS_SESSION_ACTIVE);
1988 		if (sp != NULL) {
1989 			rw_enter(&sp->session_lock, RW_WRITER);
1990 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1991 				ret = rds_session_init(sp);
1992 				if (ret != 0) {
1993 					RDS_DPRINTF2("rds_sendmsg",
1994 					    "SP(%p): rds_session_init failed",
1995 					    sp);
1996 					sp->session_state =
1997 					    RDS_SESSION_STATE_FAILED;
1998 					RDS_DPRINTF3("rds_sendmsg",
1999 					    "SP(%p) State "
2000 					    "RDS_SESSION_STATE_FAILED", sp);
2001 					rw_exit(&sp->session_lock);
2002 					return (EFAULT);
2003 				}
2004 				sp->session_state = RDS_SESSION_STATE_INIT;
2005 				RDS_DPRINTF3("rds_sendmsg",
2006 				    "SP(%p) State "
2007 				    "RDS_SESSION_STATE_INIT", sp);
2008 				rw_exit(&sp->session_lock);
2009 				rds_session_open(sp);
2010 			} else {
2011 				rw_exit(&sp->session_lock);
2012 			}
2013 		} else {
2014 			/* Is a session created for this destination */
2015 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
2016 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
2017 			rw_exit(&rdsib_statep->rds_sessionlock);
2018 			if (sp == NULL) {
2019 				return (EFAULT);
2020 			}
2021 		}
2022 	}
2023 
2024 	/* There is a session to the destination */
2025 	rw_enter(&sp->session_lock, RW_READER);
2026 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2027 		rw_exit(&sp->session_lock);
2028 
2029 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2030 		    recvport);
2031 		return (ret);
2032 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2033 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2034 		ipaddr_t sendip1, recvip1;
2035 
2036 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
2037 		    "%d", sp, sp->session_state);
2038 		rw_exit(&sp->session_lock);
2039 		rw_enter(&sp->session_lock, RW_WRITER);
2040 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2041 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2042 			ibt_ip_path_attr_t	ipattr;
2043 			ibt_ip_addr_t		dstip;
2044 
2045 			sp->session_state = RDS_SESSION_STATE_CREATED;
2046 			sp->session_type = RDS_SESSION_ACTIVE;
2047 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
2048 			    "RDS_SESSION_STATE_CREATED", sp);
2049 			rw_exit(&sp->session_lock);
2050 
2051 
2052 			/* The ipaddr should be in the network order */
2053 			sendip1 = sendip;
2054 			recvip1 = recvip;
2055 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
2056 			if (ret == 0) {
2057 				RDS_DPRINTF2(LABEL, "Path not found "
2058 				    "(0x%x 0x%x)", sendip1, recvip1);
2059 			}
2060 
2061 			/* Resolve the IP addresses */
2062 			lgid.gid_prefix = 0;
2063 			lgid.gid_guid = 0;
2064 			rgid.gid_prefix = 0;
2065 			rgid.gid_guid = 0;
2066 
2067 			bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
2068 			dstip.family = AF_INET;
2069 			dstip.un.ip4addr = htonl(recvip1);
2070 			ipattr.ipa_dst_ip = &dstip;
2071 			ipattr.ipa_src_ip.family = AF_INET;
2072 			ipattr.ipa_src_ip.un.ip4addr = htonl(sendip1);
2073 			ipattr.ipa_ndst = 1;
2074 			ipattr.ipa_max_paths = 1;
2075 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
2076 			    sendip1, recvip1);
2077 			ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
2078 			    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo,
2079 			    NULL, NULL);
2080 			if (ret != IBT_SUCCESS) {
2081 				RDS_DPRINTF2("rds_sendmsg",
2082 				    "ibt_get_ip_paths failed, ret: %d ", ret);
2083 
2084 				rw_enter(&sp->session_lock, RW_WRITER);
2085 				if (sp->session_type == RDS_SESSION_ACTIVE) {
2086 					sp->session_state =
2087 					    RDS_SESSION_STATE_FAILED;
2088 					RDS_DPRINTF3("rds_sendmsg",
2089 					    "SP(%p) State "
2090 					    "RDS_SESSION_STATE_FAILED", sp);
2091 					rw_exit(&sp->session_lock);
2092 					return (EFAULT);
2093 				} else {
2094 					rw_exit(&sp->session_lock);
2095 					return (ENOMEM);
2096 				}
2097 			}
2098 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
2099 			lgid = sp->session_pinfo.
2100 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
2101 			rgid = sp->session_pinfo.
2102 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
2103 
2104 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
2105 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
2106 			    rgid.gid_guid);
2107 
2108 			rw_enter(&sp->session_lock, RW_WRITER);
2109 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2110 				sp->session_lgid = lgid;
2111 				sp->session_rgid = rgid;
2112 				ret = rds_session_init(sp);
2113 				if (ret != 0) {
2114 					RDS_DPRINTF2("rds_sendmsg",
2115 					    "SP(%p): rds_session_init failed",
2116 					    sp);
2117 					sp->session_state =
2118 					    RDS_SESSION_STATE_FAILED;
2119 					RDS_DPRINTF3("rds_sendmsg",
2120 					    "SP(%p) State "
2121 					    "RDS_SESSION_STATE_FAILED", sp);
2122 					rw_exit(&sp->session_lock);
2123 					return (EFAULT);
2124 				}
2125 				sp->session_state = RDS_SESSION_STATE_INIT;
2126 				rw_exit(&sp->session_lock);
2127 
2128 				rds_session_open(sp);
2129 
2130 			} else {
2131 				RDS_DPRINTF2("rds_sendmsg",
2132 				    "SP(%p): type changed to %d",
2133 				    sp, sp->session_type);
2134 				rw_exit(&sp->session_lock);
2135 				return (ENOMEM);
2136 			}
2137 		} else {
2138 			RDS_DPRINTF2("rds_sendmsg",
2139 			    "SP(%p): Session state %d changed",
2140 			    sp, sp->session_state);
2141 			rw_exit(&sp->session_lock);
2142 			return (ENOMEM);
2143 		}
2144 	} else {
2145 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): Session is in %d state",
2146 		    sp, sp->session_state);
2147 		rw_exit(&sp->session_lock);
2148 		return (ENOMEM);
2149 	}
2150 
2151 	rw_enter(&sp->session_lock, RW_READER);
2152 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2153 		rw_exit(&sp->session_lock);
2154 
2155 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2156 		    recvport);
2157 	} else {
2158 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): state(%d) not connected",
2159 		    sp, sp->session_state);
2160 		rw_exit(&sp->session_lock);
2161 	}
2162 
2163 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
2164 
2165 	return (ret);
2166 }
2167 
2168 /* Note: This is called on the CQ handler thread */
2169 void
2170 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
2171 {
2172 	mblk_t		*mp, *mp1;
2173 	rds_data_hdr_t	*pktp, *pktp1;
2174 	uint8_t		*datap;
2175 	rds_buf_t	*bp1;
2176 	rds_bufpool_t	*rpool;
2177 	uint_t		npkts, ix;
2178 	int		ret;
2179 
2180 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
2181 
2182 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
2183 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
2184 	npkts = pktp->dh_npkts;
2185 
2186 	/* increment rx pending here */
2187 	rpool = &ep->ep_rcvpool;
2188 	mutex_enter(&rpool->pool_lock);
2189 	rpool->pool_nbusy += npkts;
2190 	mutex_exit(&rpool->pool_lock);
2191 
2192 	/* this will get freed by sockfs */
2193 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
2194 	if (mp == NULL) {
2195 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2196 		    ep, bp);
2197 		rds_free_recv_buf(bp, npkts);
2198 		return;
2199 	}
2200 	mp->b_wptr = datap + pktp->dh_datalen;
2201 	mp->b_datap->db_type = M_DATA;
2202 
2203 	mp1 = mp;
2204 	bp1 = bp->buf_nextp;
2205 	while (bp1 != NULL) {
2206 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
2207 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
2208 		    RDS_DATA_HDR_SZ;
2209 
2210 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
2211 		    BPRI_HI, &bp1->buf_frtn);
2212 		if (mp1->b_cont == NULL) {
2213 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2214 			    ep, bp1);
2215 			freemsg(mp);
2216 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
2217 			return;
2218 		}
2219 		mp1 = mp1->b_cont;
2220 		mp1->b_wptr = datap + pktp1->dh_datalen;
2221 		mp1->b_datap->db_type = M_DATA;
2222 
2223 		bp1 = bp1->buf_nextp;
2224 	}
2225 
2226 	RDS_INCR_RXPKTS_PEND(npkts);
2227 	RDS_INCR_RXPKTS(npkts);
2228 	RDS_INCR_RXBYTES(msgdsize(mp));
2229 
2230 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
2231 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
2232 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
2233 	    npkts, pktp->dh_psn);
2234 
2235 	/* store the last buffer id, no lock needed */
2236 	if (npkts > 1) {
2237 		ep->ep_rbufid = pktp1->dh_bufid;
2238 	} else {
2239 		ep->ep_rbufid = pktp->dh_bufid;
2240 	}
2241 
2242 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
2243 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
2244 	if (ret != 0) {
2245 		if (ret == ENOSPC) {
2246 			/*
2247 			 * The message is delivered but cannot take more,
2248 			 * stop further remote messages coming to this port
2249 			 */
2250 			RDS_DPRINTF3("rds_received_msg", "Port %d NO SPACE",
2251 			    pktp->dh_recvport);
2252 			rds_stall_port(ep->ep_sp, pktp->dh_recvport, RDS_LOCAL);
2253 		} else {
2254 			RDS_DPRINTF2(LABEL, "rds_deliver_new_msg returned: %d",
2255 			    ret);
2256 		}
2257 	}
2258 
2259 	mutex_enter(&ep->ep_lock);
2260 	/* The first message can come in before the conn est event */
2261 	if ((ep->ep_rdmacnt == 0) && (ep->ep_state == RDS_EP_STATE_CONNECTED)) {
2262 		ep->ep_rdmacnt++;
2263 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
2264 		mutex_exit(&ep->ep_lock);
2265 
2266 		/* send acknowledgement */
2267 		RDS_INCR_TXACKS();
2268 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
2269 		if (ret != IBT_SUCCESS) {
2270 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send for "
2271 			    "acknowledgement failed: %d, SQ depth: %d",
2272 			    ep, ret, ep->ep_sndpool.pool_nbusy);
2273 			mutex_enter(&ep->ep_lock);
2274 			ep->ep_rdmacnt--;
2275 			mutex_exit(&ep->ep_lock);
2276 		}
2277 	} else {
2278 		/* no room to send acknowledgement */
2279 		mutex_exit(&ep->ep_lock);
2280 	}
2281 
2282 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
2283 }
2284