xref: /titanic_51/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision 0c19630b1592aa30d3e4d9db1a2a8cf9a91c0e72)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #include <sys/stream.h>
76 #include <sys/ib/clients/rds/rdsib_cm.h>
77 #include <sys/ib/clients/rds/rdsib_ib.h>
78 #include <sys/ib/clients/rds/rdsib_buf.h>
79 #include <sys/ib/clients/rds/rdsib_ep.h>
80 #include <sys/ib/clients/rds/rds_kstat.h>
81 #include <sys/zone.h>
82 
83 #define	RDS_POLL_CQ_IN_2TICKS	1
84 
85 /*
86  * This File contains the endpoint related calls
87  */
88 
89 extern boolean_t rds_islocal(ipaddr_t addr);
90 extern uint_t rds_wc_signal;
91 
92 #define	RDS_LOOPBACK	0
93 #define	RDS_LOCAL	1
94 #define	RDS_REMOTE	2
95 
96 #define	IBT_IPADDR	1
97 
98 static uint8_t
99 rds_is_port_marked(rds_session_t *sp, in_port_t port, uint_t qualifier)
100 {
101 	uint8_t	ret;
102 
103 	switch (qualifier) {
104 	case RDS_LOOPBACK: /* loopback */
105 		rw_enter(&rds_loopback_portmap_lock, RW_READER);
106 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
107 		rw_exit(&rds_loopback_portmap_lock);
108 		break;
109 
110 	case RDS_LOCAL: /* Session local */
111 		ASSERT(sp != NULL);
112 		rw_enter(&sp->session_local_portmap_lock, RW_READER);
113 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
114 		rw_exit(&sp->session_local_portmap_lock);
115 		break;
116 
117 	case RDS_REMOTE: /* Session remote */
118 		ASSERT(sp != NULL);
119 		rw_enter(&sp->session_remote_portmap_lock, RW_READER);
120 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
121 		rw_exit(&sp->session_remote_portmap_lock);
122 		break;
123 	}
124 
125 	return (ret);
126 }
127 
128 static uint8_t
129 rds_check_n_mark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
130 {
131 	uint8_t	ret;
132 
133 	switch (qualifier) {
134 	case RDS_LOOPBACK: /* loopback */
135 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
136 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
137 		if (!ret) {
138 			/* port is not marked, mark it */
139 			rds_loopback_portmap[port/8] =
140 			    rds_loopback_portmap[port/8] | (1 << (port % 8));
141 		}
142 		rw_exit(&rds_loopback_portmap_lock);
143 		break;
144 
145 	case RDS_LOCAL: /* Session local */
146 		ASSERT(sp != NULL);
147 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
148 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
149 		if (!ret) {
150 			/* port is not marked, mark it */
151 			sp->session_local_portmap[port/8] =
152 			    sp->session_local_portmap[port/8] |
153 			    (1 << (port % 8));
154 		}
155 		rw_exit(&sp->session_local_portmap_lock);
156 		break;
157 
158 	case RDS_REMOTE: /* Session remote */
159 		ASSERT(sp != NULL);
160 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
161 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
162 		if (!ret) {
163 			/* port is not marked, mark it */
164 			sp->session_remote_portmap[port/8] =
165 			    sp->session_remote_portmap[port/8] |
166 			    (1 << (port % 8));
167 		}
168 		rw_exit(&sp->session_remote_portmap_lock);
169 		break;
170 	}
171 
172 	return (ret);
173 }
174 
175 static uint8_t
176 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
177 {
178 	uint8_t	ret;
179 
180 	switch (qualifier) {
181 	case RDS_LOOPBACK: /* loopback */
182 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
183 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
184 		if (ret) {
185 			/* port is marked, unmark it */
186 			rds_loopback_portmap[port/8] =
187 			    rds_loopback_portmap[port/8] & ~(1 << (port % 8));
188 		}
189 		rw_exit(&rds_loopback_portmap_lock);
190 		break;
191 
192 	case RDS_LOCAL: /* Session local */
193 		ASSERT(sp != NULL);
194 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
195 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
196 		if (ret) {
197 			/* port is marked, unmark it */
198 			sp->session_local_portmap[port/8] =
199 			    sp->session_local_portmap[port/8] &
200 			    ~(1 << (port % 8));
201 		}
202 		rw_exit(&sp->session_local_portmap_lock);
203 		break;
204 
205 	case RDS_REMOTE: /* Session remote */
206 		ASSERT(sp != NULL);
207 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
208 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
209 		if (ret) {
210 			/* port is marked, unmark it */
211 			sp->session_remote_portmap[port/8] =
212 			    sp->session_remote_portmap[port/8] &
213 			    ~(1 << (port % 8));
214 		}
215 		rw_exit(&sp->session_remote_portmap_lock);
216 		break;
217 	}
218 
219 	return (ret);
220 }
221 
222 static void
223 rds_mark_all_ports(rds_session_t *sp, uint_t qualifier)
224 {
225 	switch (qualifier) {
226 	case RDS_LOOPBACK: /* loopback */
227 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
228 		(void) memset(rds_loopback_portmap, 0xFF, RDS_PORT_MAP_SIZE);
229 		rw_exit(&rds_loopback_portmap_lock);
230 		break;
231 
232 	case RDS_LOCAL: /* Session local */
233 		ASSERT(sp != NULL);
234 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
235 		(void) memset(sp->session_local_portmap, 0xFF,
236 		    RDS_PORT_MAP_SIZE);
237 		rw_exit(&sp->session_local_portmap_lock);
238 		break;
239 
240 	case RDS_REMOTE: /* Session remote */
241 		ASSERT(sp != NULL);
242 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
243 		(void) memset(sp->session_remote_portmap, 0xFF,
244 		    RDS_PORT_MAP_SIZE);
245 		rw_exit(&sp->session_remote_portmap_lock);
246 		break;
247 	}
248 }
249 
250 static void
251 rds_unmark_all_ports(rds_session_t *sp, uint_t qualifier)
252 {
253 	switch (qualifier) {
254 	case RDS_LOOPBACK: /* loopback */
255 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
256 		bzero(rds_loopback_portmap, RDS_PORT_MAP_SIZE);
257 		rw_exit(&rds_loopback_portmap_lock);
258 		break;
259 
260 	case RDS_LOCAL: /* Session local */
261 		ASSERT(sp != NULL);
262 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
263 		bzero(sp->session_local_portmap, RDS_PORT_MAP_SIZE);
264 		rw_exit(&sp->session_local_portmap_lock);
265 		break;
266 
267 	case RDS_REMOTE: /* Session remote */
268 		ASSERT(sp != NULL);
269 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
270 		bzero(sp->session_remote_portmap, RDS_PORT_MAP_SIZE);
271 		rw_exit(&sp->session_remote_portmap_lock);
272 		break;
273 	}
274 }
275 
276 static boolean_t
277 rds_add_session(rds_session_t *sp, boolean_t locked)
278 {
279 	boolean_t retval = B_TRUE;
280 
281 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
282 
283 	if (!locked) {
284 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
285 	}
286 
287 	/* Don't allow more sessions than configured in rdsib.conf */
288 	if (rdsib_statep->rds_nsessions >= (MaxNodes - 1)) {
289 		RDS_DPRINTF1("rds_add_session", "Max session limit reached");
290 		retval = B_FALSE;
291 	} else {
292 		sp->session_nextp = rdsib_statep->rds_sessionlistp;
293 		rdsib_statep->rds_sessionlistp = sp;
294 		rdsib_statep->rds_nsessions++;
295 		RDS_INCR_SESS();
296 	}
297 
298 	if (!locked) {
299 		rw_exit(&rdsib_statep->rds_sessionlock);
300 	}
301 
302 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
303 
304 	return (retval);
305 }
306 
307 /* Session lookup based on destination IP or destination node guid */
308 rds_session_t *
309 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
310 {
311 	rds_session_t	*sp;
312 
313 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
314 	    remoteip, node_guid);
315 
316 	/* A read/write lock is expected, will panic if none of them are held */
317 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
318 	sp = statep->rds_sessionlistp;
319 	while (sp) {
320 		if ((sp->session_remip == remoteip) || ((node_guid != 0) &&
321 		    (sp->session_rgid.gid_guid == node_guid))) {
322 			break;
323 		}
324 
325 		sp = sp->session_nextp;
326 	}
327 
328 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
329 
330 	return (sp);
331 }
332 
333 boolean_t
334 rds_session_lkup_by_sp(rds_session_t *sp)
335 {
336 	rds_session_t *sessionp;
337 
338 	RDS_DPRINTF4("rds_session_lkup_by_sp", "Enter: 0x%p", sp);
339 
340 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
341 	sessionp = rdsib_statep->rds_sessionlistp;
342 	while (sessionp) {
343 		if (sessionp == sp) {
344 			rw_exit(&rdsib_statep->rds_sessionlock);
345 			return (B_TRUE);
346 		}
347 
348 		sessionp = sessionp->session_nextp;
349 	}
350 	rw_exit(&rdsib_statep->rds_sessionlock);
351 
352 	return (B_FALSE);
353 }
354 
355 static void
356 rds_ep_fini(rds_ep_t *ep)
357 {
358 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
359 
360 	/* free send pool */
361 	rds_free_send_pool(ep);
362 
363 	/* free recv pool */
364 	rds_free_recv_pool(ep);
365 
366 	mutex_enter(&ep->ep_lock);
367 	ep->ep_hca_guid = 0;
368 	mutex_exit(&ep->ep_lock);
369 
370 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
371 }
372 
373 /* Assumes SP write lock is held */
374 int
375 rds_ep_init(rds_ep_t *ep, ib_guid_t hca_guid)
376 {
377 	uint_t		ret;
378 
379 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
380 
381 	/* send pool */
382 	ret = rds_init_send_pool(ep, hca_guid);
383 	if (ret != 0) {
384 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
385 		    ep, ret);
386 		return (-1);
387 	}
388 
389 	/* recv pool */
390 	ret = rds_init_recv_pool(ep);
391 	if (ret != 0) {
392 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
393 		    ep, ret);
394 		rds_free_send_pool(ep);
395 		return (-1);
396 	}
397 
398 	/* reset the ep state */
399 	mutex_enter(&ep->ep_lock);
400 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
401 	ep->ep_hca_guid = hca_guid;
402 	ep->ep_lbufid = NULL;
403 	ep->ep_rbufid = NULL;
404 	ep->ep_segfbp = NULL;
405 	ep->ep_seglbp = NULL;
406 
407 	/* Initialize the WR to send acknowledgements */
408 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
409 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
410 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
411 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
412 	ep->ep_ackwr.wr_nds = 1;
413 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
414 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
415 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
416 	mutex_exit(&ep->ep_lock);
417 
418 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
419 
420 	return (0);
421 }
422 
423 static int
424 rds_ep_reinit(rds_ep_t *ep, ib_guid_t hca_guid)
425 {
426 	int	ret;
427 
428 	RDS_DPRINTF3("rds_ep_reinit", "Enter: EP(%p) Type: %d",
429 	    ep, ep->ep_type);
430 
431 	/* Re-initialize send pool */
432 	ret = rds_reinit_send_pool(ep, hca_guid);
433 	if (ret != 0) {
434 		RDS_DPRINTF2("rds_ep_reinit",
435 		    "EP(%p): rds_reinit_send_pool failed: %d", ep, ret);
436 		return (-1);
437 	}
438 
439 	/* free all the receive buffers in the pool */
440 	rds_free_recv_pool(ep);
441 
442 	RDS_DPRINTF3("rds_ep_reinit", "Return: EP(%p) Type: %d",
443 	    ep, ep->ep_type);
444 
445 	return (0);
446 }
447 
448 void
449 rds_session_fini(rds_session_t *sp)
450 {
451 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
452 
453 	rds_ep_fini(&sp->session_dataep);
454 	rds_ep_fini(&sp->session_ctrlep);
455 
456 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
457 }
458 
459 /*
460  * Allocate and initialize the resources needed for the control and
461  * data channels
462  */
463 int
464 rds_session_init(rds_session_t *sp)
465 {
466 	int		ret;
467 	rds_hca_t	*hcap;
468 	ib_guid_t	hca_guid;
469 
470 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
471 
472 	/* CALLED WITH SESSION WRITE LOCK */
473 
474 	hcap = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
475 	if (hcap == NULL) {
476 		RDS_DPRINTF2("rds_session_init", "SGID is on an uninitialized "
477 		    "HCA: %llx", sp->session_lgid.gid_guid);
478 		return (-1);
479 	}
480 
481 	hca_guid = hcap->hca_guid;
482 	sp->session_hca_guid = hca_guid;
483 
484 	/* allocate and initialize the ctrl channel */
485 	ret = rds_ep_init(&sp->session_ctrlep, hca_guid);
486 	if (ret != 0) {
487 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
488 		    "failed", sp, &sp->session_ctrlep);
489 		return (-1);
490 	}
491 
492 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
493 
494 	/* allocate and initialize the data channel */
495 	ret = rds_ep_init(&sp->session_dataep, hca_guid);
496 	if (ret != 0) {
497 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
498 		    "failed", sp, &sp->session_dataep);
499 		rds_ep_fini(&sp->session_ctrlep);
500 		return (-1);
501 	}
502 
503 	/* Clear the portmaps */
504 	rds_unmark_all_ports(sp, RDS_LOCAL);
505 	rds_unmark_all_ports(sp, RDS_REMOTE);
506 
507 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
508 
509 	RDS_DPRINTF2("rds_session_init", "Return");
510 
511 	return (0);
512 }
513 
514 /*
515  * This should be called before moving a session from ERROR state to
516  * INIT state. This will update the HCA keys incase the session has moved from
517  * one HCA to another.
518  */
519 int
520 rds_session_reinit(rds_session_t *sp, ib_gid_t lgid)
521 {
522 	rds_hca_t	*hcap, *hcap1;
523 	int		ret;
524 
525 	RDS_DPRINTF2("rds_session_reinit", "Enter: SP(0x%p) - state: %d",
526 	    sp, sp->session_state);
527 
528 	/* CALLED WITH SESSION WRITE LOCK */
529 
530 	/* Clear the portmaps */
531 	rds_unmark_all_ports(sp, RDS_LOCAL);
532 	rds_unmark_all_ports(sp, RDS_REMOTE);
533 
534 	/* This should not happen but just a safe guard */
535 	if (sp->session_dataep.ep_ack_addr == NULL) {
536 		RDS_DPRINTF2("rds_session_reinit",
537 		    "ERROR: Unexpected: SP(0x%p) - state: %d",
538 		    sp, sp->session_state);
539 		return (-1);
540 	}
541 
542 	/* make the last buffer as the acknowledged */
543 	*(uintptr_t *)sp->session_dataep.ep_ack_addr =
544 	    (uintptr_t)sp->session_dataep.ep_sndpool.pool_tailp;
545 
546 	hcap = rds_gid_to_hcap(rdsib_statep, lgid);
547 	if (hcap == NULL) {
548 		RDS_DPRINTF2("rds_session_reinit", "SGID is on an "
549 		    "uninitialized HCA: %llx", lgid.gid_guid);
550 		return (-1);
551 	}
552 
553 	hcap1 = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
554 	if (hcap1 == NULL) {
555 		RDS_DPRINTF2("rds_session_reinit", "Seems like HCA %llx "
556 		    "is unplugged", sp->session_lgid.gid_guid);
557 	} else if (hcap->hca_guid == hcap1->hca_guid) {
558 		/*
559 		 * No action is needed as the session did not move across
560 		 * HCAs
561 		 */
562 		RDS_DPRINTF2("rds_session_reinit", "Failover on the same HCA");
563 		return (0);
564 	}
565 
566 	RDS_DPRINTF2("rds_session_reinit", "Failover across HCAs");
567 
568 	sp->session_hca_guid = hcap->hca_guid;
569 
570 	/* re-initialize the control channel */
571 	ret = rds_ep_reinit(&sp->session_ctrlep, hcap->hca_guid);
572 	if (ret != 0) {
573 		RDS_DPRINTF2("rds_session_reinit",
574 		    "SP(%p): Ctrl EP(%p) re-initialization failed",
575 		    sp, &sp->session_ctrlep);
576 		return (-1);
577 	}
578 
579 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Control EP(%p)",
580 	    sp, &sp->session_ctrlep);
581 
582 	/* re-initialize the data channel */
583 	ret = rds_ep_reinit(&sp->session_dataep, hcap->hca_guid);
584 	if (ret != 0) {
585 		RDS_DPRINTF2("rds_session_reinit",
586 		    "SP(%p): Data EP(%p) re-initialization failed",
587 		    sp, &sp->session_dataep);
588 		return (-1);
589 	}
590 
591 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Data EP(%p)",
592 	    sp, &sp->session_dataep);
593 
594 	sp->session_lgid = lgid;
595 
596 	RDS_DPRINTF2("rds_session_reinit", "Return: SP(0x%p)", sp);
597 
598 	return (0);
599 }
600 
601 static int
602 rds_session_connect(rds_session_t *sp)
603 {
604 	ibt_channel_hdl_t	ctrlchan, datachan;
605 	rds_ep_t		*ep;
606 	int			ret;
607 
608 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
609 
610 	sp->session_pinfo.pi_sid = rdsib_statep->rds_service_id;
611 
612 	/* Override the packet life time based on the conf file */
613 	if (IBPktLifeTime != 0) {
614 		sp->session_pinfo.pi_prim_cep_path.cep_cm_opaque1 =
615 		    IBPktLifeTime;
616 	}
617 
618 	/* Session type may change if we run into peer-to-peer case. */
619 	rw_enter(&sp->session_lock, RW_READER);
620 	if (sp->session_type == RDS_SESSION_PASSIVE) {
621 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
622 		    "active end", sp);
623 		rw_exit(&sp->session_lock);
624 		return (0); /* return success */
625 	}
626 	rw_exit(&sp->session_lock);
627 
628 	/* connect the data ep first */
629 	ep = &sp->session_dataep;
630 	mutex_enter(&ep->ep_lock);
631 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
632 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
633 		mutex_exit(&ep->ep_lock);
634 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
635 		    &datachan);
636 		if (ret != IBT_SUCCESS) {
637 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
638 			    "failed: %d", ep, ret);
639 			return (-1);
640 		}
641 		sp->session_dataep.ep_chanhdl = datachan;
642 	} else {
643 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
644 		    "unexpected state: %d", sp, ep, ep->ep_state);
645 		mutex_exit(&ep->ep_lock);
646 		return (-1);
647 	}
648 
649 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
650 	    sp, ep);
651 
652 	ep = &sp->session_ctrlep;
653 	mutex_enter(&ep->ep_lock);
654 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
655 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
656 		mutex_exit(&ep->ep_lock);
657 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
658 		    &ctrlchan);
659 		if (ret != IBT_SUCCESS) {
660 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
661 			    "failed: %d", ep, ret);
662 			return (-1);
663 		}
664 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
665 	} else {
666 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
667 		    "unexpected state: %d", sp, ep, ep->ep_state);
668 		mutex_exit(&ep->ep_lock);
669 		return (-1);
670 	}
671 
672 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
673 	    sp, sp->session_myip, sp->session_remip);
674 
675 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
676 
677 	return (0);
678 }
679 
680 /*
681  * Can be called with or without session_lock.
682  */
683 void
684 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
685 {
686 	rds_ep_t		*ep;
687 
688 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
689 	    sp->session_state);
690 
691 	ep = &sp->session_dataep;
692 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
693 
694 	/* wait until the SQ is empty before closing */
695 	if (wait != 0) {
696 		(void) rds_is_sendq_empty(ep, wait);
697 	}
698 
699 	mutex_enter(&ep->ep_lock);
700 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
701 		mutex_exit(&ep->ep_lock);
702 		delay(drv_usectohz(300000));
703 		mutex_enter(&ep->ep_lock);
704 	}
705 
706 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
707 		ep->ep_state = RDS_EP_STATE_CLOSING;
708 		mutex_exit(&ep->ep_lock);
709 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
710 		if (wait == 0) {
711 			/* make sure all WCs are flushed before proceeding */
712 			(void) rds_is_sendq_empty(ep, 1);
713 		}
714 		mutex_enter(&ep->ep_lock);
715 	}
716 	rds_ep_free_rc_channel(ep);
717 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
718 	ep->ep_segfbp = NULL;
719 	ep->ep_seglbp = NULL;
720 	mutex_exit(&ep->ep_lock);
721 
722 	ep = &sp->session_ctrlep;
723 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
724 
725 	/* wait until the SQ is empty before closing */
726 	if (wait != 0) {
727 		(void) rds_is_sendq_empty(ep, wait);
728 	}
729 
730 	mutex_enter(&ep->ep_lock);
731 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
732 		mutex_exit(&ep->ep_lock);
733 		delay(drv_usectohz(300000));
734 		mutex_enter(&ep->ep_lock);
735 	}
736 
737 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
738 		ep->ep_state = RDS_EP_STATE_CLOSING;
739 		mutex_exit(&ep->ep_lock);
740 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
741 		if (wait == 0) {
742 			/* make sure all WCs are flushed before proceeding */
743 			(void) rds_is_sendq_empty(ep, 1);
744 		}
745 		mutex_enter(&ep->ep_lock);
746 	}
747 	rds_ep_free_rc_channel(ep);
748 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
749 	ep->ep_segfbp = NULL;
750 	ep->ep_seglbp = NULL;
751 	mutex_exit(&ep->ep_lock);
752 
753 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
754 }
755 
756 /* Free the session */
757 static void
758 rds_destroy_session(rds_session_t *sp)
759 {
760 	rds_ep_t	*ep;
761 	rds_bufpool_t	*pool;
762 
763 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
764 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
765 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
766 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
767 
768 	rw_enter(&sp->session_lock, RW_READER);
769 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
770 	    sp->session_state);
771 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
772 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
773 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
774 		rw_exit(&sp->session_lock);
775 		delay(drv_usectohz(1000000));
776 		rw_enter(&sp->session_lock, RW_READER);
777 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
778 		    "ON SESSION", sp, sp->session_state);
779 	}
780 	rw_exit(&sp->session_lock);
781 
782 	/* data channel */
783 	ep = &sp->session_dataep;
784 
785 	/* send pool locks */
786 	pool = &ep->ep_sndpool;
787 	cv_destroy(&pool->pool_cv);
788 	mutex_destroy(&pool->pool_lock);
789 
790 	/* recv pool locks */
791 	pool = &ep->ep_rcvpool;
792 	cv_destroy(&pool->pool_cv);
793 	mutex_destroy(&pool->pool_lock);
794 	mutex_destroy(&ep->ep_recvqp.qp_lock);
795 
796 	/* control channel */
797 	ep = &sp->session_ctrlep;
798 
799 	/* send pool locks */
800 	pool = &ep->ep_sndpool;
801 	cv_destroy(&pool->pool_cv);
802 	mutex_destroy(&pool->pool_lock);
803 
804 	/* recv pool locks */
805 	pool = &ep->ep_rcvpool;
806 	cv_destroy(&pool->pool_cv);
807 	mutex_destroy(&pool->pool_lock);
808 	mutex_destroy(&ep->ep_recvqp.qp_lock);
809 
810 	/* session */
811 	rw_destroy(&sp->session_lock);
812 	rw_destroy(&sp->session_local_portmap_lock);
813 	rw_destroy(&sp->session_remote_portmap_lock);
814 
815 	/* free the session */
816 	kmem_free(sp, sizeof (rds_session_t));
817 
818 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
819 }
820 
821 /* This is called on the taskq thread */
822 void
823 rds_failover_session(void *arg)
824 {
825 	rds_session_t	*sp = (rds_session_t *)arg;
826 	ib_gid_t	lgid, rgid;
827 	ipaddr_t	myip, remip;
828 	int		ret, cnt = 0;
829 	uint8_t		sp_state;
830 
831 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
832 
833 	/* Make sure the session is still alive */
834 	if (rds_session_lkup_by_sp(sp) == B_FALSE) {
835 		RDS_DPRINTF2("rds_failover_session",
836 		    "Return: SP(%p) not ALIVE", sp);
837 		return;
838 	}
839 
840 	RDS_INCR_FAILOVERS();
841 
842 	rw_enter(&sp->session_lock, RW_WRITER);
843 	if (sp->session_type != RDS_SESSION_ACTIVE) {
844 		/*
845 		 * The remote side must have seen the error and initiated
846 		 * a re-connect.
847 		 */
848 		RDS_DPRINTF2("rds_failover_session",
849 		    "SP(%p) has become passive", sp);
850 		rw_exit(&sp->session_lock);
851 		return;
852 	}
853 	sp->session_failover = 1;
854 	sp_state = sp->session_state;
855 	rw_exit(&sp->session_lock);
856 
857 	/*
858 	 * The session is in ERROR state but close both channels
859 	 * for a clean start.
860 	 */
861 	if (sp_state == RDS_SESSION_STATE_ERROR) {
862 		rds_session_close(sp, IBT_BLOCKING, 1);
863 	}
864 
865 	/* wait 1 sec before re-connecting */
866 	delay(drv_usectohz(1000000));
867 
868 	do {
869 		ibt_ip_path_attr_t	ipattr;
870 		ibt_ip_addr_t		dstip;
871 
872 		/* The ipaddr should be in the network order */
873 		myip = sp->session_myip;
874 		remip = sp->session_remip;
875 		ret = rds_sc_path_lookup(&myip, &remip);
876 		if (ret == 0) {
877 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
878 			    myip, remip);
879 		}
880 		/* check if we have (new) path from the source to destination */
881 		lgid.gid_prefix = 0;
882 		lgid.gid_guid = 0;
883 		rgid.gid_prefix = 0;
884 		rgid.gid_guid = 0;
885 
886 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
887 		dstip.family = AF_INET;
888 		dstip.un.ip4addr = remip;
889 		ipattr.ipa_dst_ip = &dstip;
890 		ipattr.ipa_src_ip.family = AF_INET;
891 		ipattr.ipa_src_ip.un.ip4addr = myip;
892 		ipattr.ipa_ndst = 1;
893 		ipattr.ipa_max_paths = 1;
894 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
895 		    myip, remip);
896 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
897 		    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo, NULL, NULL);
898 		if (ret == IBT_SUCCESS) {
899 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
900 			lgid = sp->session_pinfo.
901 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
902 			rgid = sp->session_pinfo.
903 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
904 			break;
905 		}
906 
907 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths failed, ret: %d ", ret);
908 
909 		/* wait 1 sec before re-trying */
910 		delay(drv_usectohz(1000000));
911 		cnt++;
912 	} while (cnt < 5);
913 
914 	if (ret != IBT_SUCCESS) {
915 		rw_enter(&sp->session_lock, RW_WRITER);
916 		if (sp->session_type == RDS_SESSION_ACTIVE) {
917 			rds_session_fini(sp);
918 			sp->session_state = RDS_SESSION_STATE_FAILED;
919 			sp->session_failover = 0;
920 			RDS_DPRINTF3("rds_failover_session",
921 			    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
922 		} else {
923 			RDS_DPRINTF2("rds_failover_session",
924 			    "SP(%p) has become passive", sp);
925 		}
926 		rw_exit(&sp->session_lock);
927 		return;
928 	}
929 
930 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
931 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
932 	    rgid.gid_guid);
933 
934 	rw_enter(&sp->session_lock, RW_WRITER);
935 	if (sp->session_type != RDS_SESSION_ACTIVE) {
936 		/*
937 		 * The remote side must have seen the error and initiated
938 		 * a re-connect.
939 		 */
940 		RDS_DPRINTF2("rds_failover_session",
941 		    "SP(%p) has become passive", sp);
942 		rw_exit(&sp->session_lock);
943 		return;
944 	}
945 
946 	/* move the session to init state */
947 	ret = rds_session_reinit(sp, lgid);
948 	sp->session_lgid = lgid;
949 	sp->session_rgid = rgid;
950 	if (ret != 0) {
951 		rds_session_fini(sp);
952 		sp->session_state = RDS_SESSION_STATE_FAILED;
953 		sp->session_failover = 0;
954 		RDS_DPRINTF3("rds_failover_session",
955 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
956 		rw_exit(&sp->session_lock);
957 		return;
958 	} else {
959 		sp->session_state = RDS_SESSION_STATE_INIT;
960 		RDS_DPRINTF3("rds_failover_session",
961 		    "SP(%p) State RDS_SESSION_STATE_INIT", sp);
962 	}
963 	rw_exit(&sp->session_lock);
964 
965 	rds_session_open(sp);
966 
967 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
968 }
969 
970 void
971 rds_handle_send_error(rds_ep_t *ep)
972 {
973 	if (rds_is_sendq_empty(ep, 0)) {
974 		/* Session should already be in ERROR, try to reconnect */
975 		RDS_DPRINTF2("rds_handle_send_error",
976 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
977 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
978 		    (void *)ep->ep_sp, DDI_SLEEP);
979 	}
980 }
981 
982 /*
983  * Called in the CM handler on the passive side
984  * Called on a taskq thread.
985  */
986 void
987 rds_cleanup_passive_session(void *arg)
988 {
989 	rds_session_t	*sp = arg;
990 
991 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
992 	    sp->session_state);
993 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
994 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
995 
996 	rds_session_close(sp, IBT_BLOCKING, 1);
997 
998 	rw_enter(&sp->session_lock, RW_WRITER);
999 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
1000 		rds_session_fini(sp);
1001 		sp->session_state = RDS_SESSION_STATE_FINI;
1002 		sp->session_failover = 0;
1003 		RDS_DPRINTF3("rds_cleanup_passive_session",
1004 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1005 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
1006 		rds_session_fini(sp);
1007 		sp->session_state = RDS_SESSION_STATE_FAILED;
1008 		sp->session_failover = 0;
1009 		RDS_DPRINTF3("rds_cleanup_passive_session",
1010 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1011 	}
1012 	rw_exit(&sp->session_lock);
1013 
1014 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
1015 }
1016 
1017 /*
1018  * Called by the CM handler on the passive side
1019  * Called with WRITE lock on the session
1020  */
1021 void
1022 rds_passive_session_fini(rds_session_t *sp)
1023 {
1024 	rds_ep_t	*ep;
1025 
1026 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
1027 	    sp->session_state);
1028 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
1029 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
1030 
1031 	/* clean the data channel */
1032 	ep = &sp->session_dataep;
1033 	(void) rds_is_sendq_empty(ep, 1);
1034 	mutex_enter(&ep->ep_lock);
1035 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1036 	    ep->ep_state);
1037 	rds_ep_free_rc_channel(ep);
1038 	mutex_exit(&ep->ep_lock);
1039 
1040 	/* clean the control channel */
1041 	ep = &sp->session_ctrlep;
1042 	(void) rds_is_sendq_empty(ep, 1);
1043 	mutex_enter(&ep->ep_lock);
1044 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1045 	    ep->ep_state);
1046 	rds_ep_free_rc_channel(ep);
1047 	mutex_exit(&ep->ep_lock);
1048 
1049 	rds_session_fini(sp);
1050 	sp->session_failover = 0;
1051 
1052 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
1053 }
1054 
1055 void
1056 rds_close_this_session(rds_session_t *sp, uint8_t wait)
1057 {
1058 	switch (sp->session_state) {
1059 	case RDS_SESSION_STATE_CONNECTED:
1060 		sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1061 		rw_exit(&sp->session_lock);
1062 
1063 		rds_session_close(sp, IBT_BLOCKING, wait);
1064 
1065 		rw_enter(&sp->session_lock, RW_WRITER);
1066 		sp->session_state = RDS_SESSION_STATE_CLOSED;
1067 		RDS_DPRINTF3("rds_close_sessions",
1068 		    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1069 		rds_session_fini(sp);
1070 		sp->session_state = RDS_SESSION_STATE_FINI;
1071 		sp->session_failover = 0;
1072 		RDS_DPRINTF3("rds_close_sessions",
1073 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1074 		break;
1075 
1076 	case RDS_SESSION_STATE_ERROR:
1077 	case RDS_SESSION_STATE_PASSIVE_CLOSING:
1078 	case RDS_SESSION_STATE_INIT:
1079 		sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1080 		rw_exit(&sp->session_lock);
1081 
1082 		rds_session_close(sp, IBT_BLOCKING, wait);
1083 
1084 		rw_enter(&sp->session_lock, RW_WRITER);
1085 		sp->session_state = RDS_SESSION_STATE_CLOSED;
1086 		RDS_DPRINTF3("rds_close_sessions",
1087 		    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1088 		/* FALLTHRU */
1089 	case RDS_SESSION_STATE_CLOSED:
1090 		rds_session_fini(sp);
1091 		sp->session_state = RDS_SESSION_STATE_FINI;
1092 		sp->session_failover = 0;
1093 		RDS_DPRINTF3("rds_close_sessions",
1094 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1095 		break;
1096 	}
1097 }
1098 
1099 /*
1100  * Can be called:
1101  * 1. on driver detach
1102  * 2. on taskq thread
1103  * arg is always NULL
1104  */
1105 /* ARGSUSED */
1106 void
1107 rds_close_sessions(void *arg)
1108 {
1109 	rds_session_t *sp, *spnextp;
1110 
1111 	RDS_DPRINTF2("rds_close_sessions", "Enter");
1112 
1113 	/* wait until all the buffers are freed by the sockets */
1114 	while (RDS_GET_RXPKTS_PEND() != 0) {
1115 		/* wait one second and try again */
1116 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
1117 		    "pending packets", RDS_GET_RXPKTS_PEND());
1118 		delay(drv_usectohz(1000000));
1119 	}
1120 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
1121 
1122 	/* close all the sessions */
1123 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
1124 	sp = rdsib_statep->rds_sessionlistp;
1125 	while (sp) {
1126 		rw_enter(&sp->session_lock, RW_WRITER);
1127 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
1128 		    sp->session_state);
1129 		rds_close_this_session(sp, 2);
1130 		rw_exit(&sp->session_lock);
1131 		sp = sp->session_nextp;
1132 	}
1133 
1134 	sp = rdsib_statep->rds_sessionlistp;
1135 	rdsib_statep->rds_sessionlistp = NULL;
1136 	rdsib_statep->rds_nsessions = 0;
1137 	rw_exit(&rdsib_statep->rds_sessionlock);
1138 
1139 	while (sp) {
1140 		spnextp = sp->session_nextp;
1141 		rds_destroy_session(sp);
1142 		RDS_DECR_SESS();
1143 		sp = spnextp;
1144 	}
1145 
1146 	/* free the global pool */
1147 	rds_free_recv_caches(rdsib_statep);
1148 
1149 	RDS_DPRINTF2("rds_close_sessions", "Return");
1150 }
1151 
1152 void
1153 rds_session_open(rds_session_t *sp)
1154 {
1155 	int		ret;
1156 
1157 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
1158 
1159 	ret = rds_session_connect(sp);
1160 	if (ret == -1) {
1161 		/*
1162 		 * may be the session has become passive due to
1163 		 * hitting peer-to-peer case
1164 		 */
1165 		rw_enter(&sp->session_lock, RW_READER);
1166 		if (sp->session_type == RDS_SESSION_PASSIVE) {
1167 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
1168 			    "has become passive from active", sp);
1169 			rw_exit(&sp->session_lock);
1170 			return;
1171 		}
1172 
1173 		/* get the lock for writing */
1174 		rw_exit(&sp->session_lock);
1175 		rw_enter(&sp->session_lock, RW_WRITER);
1176 		sp->session_state = RDS_SESSION_STATE_ERROR;
1177 		RDS_DPRINTF3("rds_session_open",
1178 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
1179 		rw_exit(&sp->session_lock);
1180 
1181 		/* Connect request failed */
1182 		rds_session_close(sp, IBT_BLOCKING, 1);
1183 
1184 		rw_enter(&sp->session_lock, RW_WRITER);
1185 		rds_session_fini(sp);
1186 		sp->session_state = RDS_SESSION_STATE_FAILED;
1187 		sp->session_failover = 0;
1188 		RDS_DPRINTF3("rds_session_open",
1189 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1190 		rw_exit(&sp->session_lock);
1191 
1192 		return;
1193 	}
1194 
1195 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
1196 }
1197 
1198 /*
1199  * Creates a session and inserts it into the list of sessions. The session
1200  * state would be CREATED.
1201  * Return Values:
1202  *	EWOULDBLOCK
1203  */
1204 rds_session_t *
1205 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
1206     ibt_cm_req_rcv_t *reqp, uint8_t type)
1207 {
1208 	ib_gid_t	lgid, rgid;
1209 	rds_session_t	*newp, *oldp;
1210 	rds_ep_t	*dataep, *ctrlep;
1211 	rds_bufpool_t	*pool;
1212 	int		ret;
1213 
1214 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x, type: %d",
1215 	    statep, localip, remip, type);
1216 
1217 	/* Check if there is space for a new session */
1218 	rw_enter(&statep->rds_sessionlock, RW_READER);
1219 	if (statep->rds_nsessions >= (MaxNodes - 1)) {
1220 		rw_exit(&statep->rds_sessionlock);
1221 		RDS_DPRINTF1("rds_session_create", "No More Sessions allowed");
1222 		return (NULL);
1223 	}
1224 	rw_exit(&statep->rds_sessionlock);
1225 
1226 	/* Allocate and initialize global buffer pool */
1227 	ret = rds_init_recv_caches(statep);
1228 	if (ret != 0) {
1229 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
1230 		return (NULL);
1231 	}
1232 
1233 	/* enough memory for session (includes 2 endpoints) */
1234 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
1235 
1236 	newp->session_remip = remip;
1237 	newp->session_myip = localip;
1238 	newp->session_type = type;
1239 	newp->session_state = RDS_SESSION_STATE_CREATED;
1240 	RDS_DPRINTF3("rds_session_create",
1241 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
1242 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
1243 	rw_init(&newp->session_local_portmap_lock, NULL, RW_DRIVER, NULL);
1244 	rw_init(&newp->session_remote_portmap_lock, NULL, RW_DRIVER, NULL);
1245 
1246 	/* Initialize data endpoint */
1247 	dataep = &newp->session_dataep;
1248 	dataep->ep_remip = newp->session_remip;
1249 	dataep->ep_myip = newp->session_myip;
1250 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
1251 	dataep->ep_sp = newp;
1252 	dataep->ep_type = RDS_EP_TYPE_DATA;
1253 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1254 
1255 	/* Initialize send pool locks */
1256 	pool = &dataep->ep_sndpool;
1257 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1258 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1259 
1260 	/* Initialize recv pool locks */
1261 	pool = &dataep->ep_rcvpool;
1262 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1263 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1264 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1265 
1266 	/* Initialize control endpoint */
1267 	ctrlep = &newp->session_ctrlep;
1268 	ctrlep->ep_remip = newp->session_remip;
1269 	ctrlep->ep_myip = newp->session_myip;
1270 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
1271 	ctrlep->ep_sp = newp;
1272 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
1273 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1274 
1275 	/* Initialize send pool locks */
1276 	pool = &ctrlep->ep_sndpool;
1277 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1278 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1279 
1280 	/* Initialize recv pool locks */
1281 	pool = &ctrlep->ep_rcvpool;
1282 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1283 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1284 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1285 
1286 	/* lkup if there is already a session */
1287 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
1288 	oldp = rds_session_lkup(statep, remip, 0);
1289 	if (oldp != NULL) {
1290 		/* A session to this destination exists */
1291 		rw_exit(&statep->rds_sessionlock);
1292 		rw_destroy(&newp->session_lock);
1293 		rw_destroy(&newp->session_local_portmap_lock);
1294 		rw_destroy(&newp->session_remote_portmap_lock);
1295 		mutex_destroy(&dataep->ep_lock);
1296 		mutex_destroy(&ctrlep->ep_lock);
1297 		kmem_free(newp, sizeof (rds_session_t));
1298 		return (NULL);
1299 	}
1300 
1301 	/* Insert this session into the list */
1302 	if (rds_add_session(newp, B_TRUE) != B_TRUE) {
1303 		/* No room to add this session */
1304 		rw_exit(&statep->rds_sessionlock);
1305 		rw_destroy(&newp->session_lock);
1306 		rw_destroy(&newp->session_local_portmap_lock);
1307 		rw_destroy(&newp->session_remote_portmap_lock);
1308 		mutex_destroy(&dataep->ep_lock);
1309 		mutex_destroy(&ctrlep->ep_lock);
1310 		kmem_free(newp, sizeof (rds_session_t));
1311 		return (NULL);
1312 	}
1313 
1314 	/* unlock the session list */
1315 	rw_exit(&statep->rds_sessionlock);
1316 
1317 	if (type == RDS_SESSION_ACTIVE) {
1318 		ipaddr_t		localip1, remip1;
1319 		ibt_ip_path_attr_t	ipattr;
1320 		ibt_ip_addr_t		dstip;
1321 
1322 		/* The ipaddr should be in the network order */
1323 		localip1 = localip;
1324 		remip1 = remip;
1325 		ret = rds_sc_path_lookup(&localip1, &remip1);
1326 		if (ret == 0) {
1327 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1328 			    localip, remip);
1329 		}
1330 
1331 		/* Get the gids for the source and destination ip addrs */
1332 		lgid.gid_prefix = 0;
1333 		lgid.gid_guid = 0;
1334 		rgid.gid_prefix = 0;
1335 		rgid.gid_guid = 0;
1336 
1337 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1338 		dstip.family = AF_INET;
1339 		dstip.un.ip4addr = remip1;
1340 		ipattr.ipa_dst_ip = &dstip;
1341 		ipattr.ipa_src_ip.family = AF_INET;
1342 		ipattr.ipa_src_ip.un.ip4addr = localip1;
1343 		ipattr.ipa_ndst = 1;
1344 		ipattr.ipa_max_paths = 1;
1345 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
1346 		    localip1, remip1);
1347 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
1348 		    IBT_PATH_NO_FLAGS, &ipattr, &newp->session_pinfo,
1349 		    NULL, NULL);
1350 		if (ret != IBT_SUCCESS) {
1351 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths failed, ret: %d "
1352 			    "lgid: %llx:%llx rgid: %llx:%llx", lgid.gid_prefix,
1353 			    lgid.gid_guid, rgid.gid_prefix, rgid.gid_guid);
1354 
1355 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1356 			return (NULL);
1357 		}
1358 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
1359 		lgid =
1360 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_sgid;
1361 		rgid =
1362 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_dgid;
1363 
1364 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1365 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1366 		    rgid.gid_guid);
1367 	}
1368 
1369 	rw_enter(&newp->session_lock, RW_WRITER);
1370 	/* check for peer-to-peer case */
1371 	if (type == newp->session_type) {
1372 		/* no peer-to-peer case */
1373 		if (type == RDS_SESSION_ACTIVE) {
1374 			newp->session_lgid = lgid;
1375 			newp->session_rgid = rgid;
1376 		} else {
1377 			/* rgid is requester gid & lgid is receiver gid */
1378 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1379 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1380 		}
1381 	}
1382 	rw_exit(&newp->session_lock);
1383 
1384 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1385 
1386 	return (newp);
1387 }
1388 
1389 void
1390 rds_handle_close_session_request(void *arg)
1391 {
1392 	rds_session_t	*sp = (rds_session_t *)arg;
1393 
1394 	RDS_DPRINTF2("rds_handle_close_session_request",
1395 	    "Enter: Closing this Session (%p)", sp);
1396 
1397 	rw_enter(&sp->session_lock, RW_WRITER);
1398 	RDS_DPRINTF2("rds_handle_close_session_request",
1399 	    "SP(%p) State: %d", sp, sp->session_state);
1400 	rds_close_this_session(sp, 2);
1401 	rw_exit(&sp->session_lock);
1402 
1403 	RDS_DPRINTF2("rds_handle_close_session_request", "Return SP(%p)", sp);
1404 }
1405 
1406 void
1407 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1408 {
1409 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1410 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1411 
1412 	switch (cpkt->rcp_code) {
1413 	case RDS_CTRL_CODE_STALL:
1414 		RDS_INCR_STALLS_RCVD();
1415 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1416 		break;
1417 	case RDS_CTRL_CODE_UNSTALL:
1418 		RDS_INCR_UNSTALLS_RCVD();
1419 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1420 		break;
1421 	case RDS_CTRL_CODE_STALL_PORTS:
1422 		rds_mark_all_ports(sp, RDS_REMOTE);
1423 		break;
1424 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1425 		rds_unmark_all_ports(sp, RDS_REMOTE);
1426 		break;
1427 	case RDS_CTRL_CODE_HEARTBEAT:
1428 		break;
1429 	case RDS_CTRL_CODE_CLOSE_SESSION:
1430 		RDS_DPRINTF2("rds_handle_control_message",
1431 		    "SP(%p) Remote Requested to close this session", sp);
1432 		(void) ddi_taskq_dispatch(rds_taskq,
1433 		    rds_handle_close_session_request, (void *)sp, DDI_SLEEP);
1434 		break;
1435 	default:
1436 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1437 		    cpkt->rcp_code);
1438 		break;
1439 	}
1440 
1441 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1442 }
1443 
1444 int
1445 rds_post_control_message(rds_session_t *sp, uint8_t code, in_port_t port)
1446 {
1447 	ibt_send_wr_t	wr;
1448 	rds_ep_t	*ep;
1449 	rds_buf_t	*bp;
1450 	rds_ctrl_pkt_t	*cp;
1451 	int		ret;
1452 
1453 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1454 	    "Port: %d", sp, code, port);
1455 
1456 	ep = &sp->session_ctrlep;
1457 
1458 	bp = rds_get_send_buf(ep, 1);
1459 	if (bp == NULL) {
1460 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1461 		    "message: SP(%p) Code: %d Port: %d", sp, code,
1462 		    port);
1463 		return (-1);
1464 	}
1465 
1466 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1467 	cp->rcp_code = code;
1468 	cp->rcp_port = port;
1469 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1470 
1471 	wr.wr_id = (uintptr_t)bp;
1472 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1473 	wr.wr_trans = IBT_RC_SRV;
1474 	wr.wr_opcode = IBT_WRC_SEND;
1475 	wr.wr_nds = 1;
1476 	wr.wr_sgl = &bp->buf_ds;
1477 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1478 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1479 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1480 	if (ret != IBT_SUCCESS) {
1481 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1482 		    "%d", ep, ret);
1483 		bp->buf_state = RDS_SNDBUF_FREE;
1484 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1485 		return (-1);
1486 	}
1487 
1488 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1489 	    "Port: %d", sp, code, port);
1490 
1491 	return (0);
1492 }
1493 
1494 void
1495 rds_stall_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
1496 {
1497 	int		ret;
1498 
1499 	RDS_DPRINTF4("rds_stall_port", "Enter: SP(%p) Port %d", sp, port);
1500 
1501 	RDS_INCR_STALLS_TRIGGERED();
1502 
1503 	if (!rds_check_n_mark_port(sp, port, qualifier)) {
1504 
1505 		if (sp != NULL) {
1506 			ret = rds_post_control_message(sp,
1507 			    RDS_CTRL_CODE_STALL, port);
1508 			if (ret != 0) {
1509 				(void) rds_check_n_unmark_port(sp, port,
1510 				    qualifier);
1511 				return;
1512 			}
1513 			RDS_INCR_STALLS_SENT();
1514 		}
1515 	} else {
1516 		RDS_DPRINTF3(LABEL,
1517 		    "Port %d is already in stall state", port);
1518 	}
1519 
1520 	RDS_DPRINTF4("rds_stall_port", "Return: SP(%p) Port %d", sp, port);
1521 }
1522 
1523 void
1524 rds_resume_port(in_port_t port)
1525 {
1526 	rds_session_t	*sp;
1527 	uint_t		ix;
1528 	int		ret;
1529 
1530 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1531 
1532 	RDS_INCR_UNSTALLS_TRIGGERED();
1533 
1534 	/* resume loopback traffic */
1535 	(void) rds_check_n_unmark_port(NULL, port, RDS_LOOPBACK);
1536 
1537 	/* send unstall messages to resume the remote traffic */
1538 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1539 
1540 	sp = rdsib_statep->rds_sessionlistp;
1541 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1542 		ASSERT(sp != NULL);
1543 		if ((sp->session_state == RDS_SESSION_STATE_CONNECTED) &&
1544 		    (rds_check_n_unmark_port(sp, port, RDS_LOCAL))) {
1545 				ret = rds_post_control_message(sp,
1546 				    RDS_CTRL_CODE_UNSTALL, port);
1547 				if (ret != 0) {
1548 					(void) rds_check_n_mark_port(sp, port,
1549 					    RDS_LOCAL);
1550 				} else {
1551 					RDS_INCR_UNSTALLS_SENT();
1552 				}
1553 		}
1554 
1555 		sp = sp->session_nextp;
1556 	}
1557 
1558 	rw_exit(&rdsib_statep->rds_sessionlock);
1559 
1560 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1561 }
1562 
1563 static int
1564 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1565     in_port_t recvport)
1566 {
1567 	ibt_send_wr_t	*wrp, wr;
1568 	rds_buf_t	*bp, *bp1;
1569 	rds_data_hdr_t	*pktp;
1570 	uint32_t	msgsize, npkts, residual, pktno, ix;
1571 	int		ret;
1572 
1573 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1574 	    ep, uiop);
1575 
1576 	/* how many pkts are needed to carry this msg */
1577 	msgsize = uiop->uio_resid;
1578 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1579 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1580 
1581 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1582 	    msgsize, npkts);
1583 
1584 	/* Get the buffers needed to post this message */
1585 	bp = rds_get_send_buf(ep, npkts);
1586 	if (bp == NULL) {
1587 		RDS_INCR_ENOBUFS();
1588 		return (ENOBUFS);
1589 	}
1590 
1591 	if (npkts > 1) {
1592 		/*
1593 		 * multi-pkt messages are posted at the same time as a list
1594 		 * of WRs
1595 		 */
1596 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1597 		    npkts, KM_SLEEP);
1598 	}
1599 
1600 
1601 	pktno = 0;
1602 	bp1 = bp;
1603 	do {
1604 		/* prepare the header */
1605 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1606 		pktp->dh_datalen = UserBufferSize;
1607 		pktp->dh_npkts = npkts - pktno;
1608 		pktp->dh_psn = pktno;
1609 		pktp->dh_sendport = sendport;
1610 		pktp->dh_recvport = recvport;
1611 		bp1->buf_ds.ds_len = RdsPktSize;
1612 
1613 		/* copy the data */
1614 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1615 		    UserBufferSize, UIO_WRITE, uiop);
1616 		if (ret != 0) {
1617 			break;
1618 		}
1619 
1620 		if (uiop->uio_resid == 0) {
1621 			pktp->dh_datalen = residual;
1622 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1623 			break;
1624 		}
1625 		pktno++;
1626 		bp1 = bp1->buf_nextp;
1627 	} while (uiop->uio_resid);
1628 
1629 	if (ret) {
1630 		/* uiomove failed */
1631 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1632 		    uiop, ret);
1633 		if (npkts > 1) {
1634 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1635 		}
1636 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1637 		return (ret);
1638 	}
1639 
1640 	if (npkts > 1) {
1641 		/* multi-pkt message */
1642 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1643 
1644 		bp1 = bp;
1645 		for (ix = 0; ix < npkts; ix++) {
1646 			wrp[ix].wr_id = (uintptr_t)bp1;
1647 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1648 			wrp[ix].wr_trans = IBT_RC_SRV;
1649 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1650 			wrp[ix].wr_nds = 1;
1651 			wrp[ix].wr_sgl = &bp1->buf_ds;
1652 			bp1 = bp1->buf_nextp;
1653 		}
1654 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1655 
1656 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1657 		if (ret != IBT_SUCCESS) {
1658 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1659 			    "%d for %d pkts", ep, ret, npkts);
1660 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1661 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1662 			return (ret);
1663 		}
1664 
1665 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1666 	} else {
1667 		/* single pkt */
1668 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1669 		wr.wr_id = (uintptr_t)bp;
1670 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1671 		wr.wr_trans = IBT_RC_SRV;
1672 		wr.wr_opcode = IBT_WRC_SEND;
1673 		wr.wr_nds = 1;
1674 		wr.wr_sgl = &bp->buf_ds;
1675 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1676 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1677 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1678 		if (ret != IBT_SUCCESS) {
1679 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1680 			    "%d", ep, ret);
1681 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1682 			return (ret);
1683 		}
1684 	}
1685 
1686 	RDS_INCR_TXPKTS(npkts);
1687 	RDS_INCR_TXBYTES(msgsize);
1688 
1689 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1690 	    ep, uiop);
1691 
1692 	return (0);
1693 }
1694 
1695 static int
1696 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1697     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1698 {
1699 	mblk_t		*mp;
1700 	int		ret;
1701 
1702 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1703 
1704 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1705 	    "%d to recvport: %d", sendport, recvport);
1706 
1707 	mp = allocb(uiop->uio_resid, BPRI_MED);
1708 	if (mp == NULL) {
1709 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1710 		    uiop->uio_resid);
1711 		return (ENOSPC);
1712 	}
1713 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1714 
1715 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1716 	if (ret) {
1717 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1718 		freeb(mp);
1719 		return (ret);
1720 	}
1721 
1722 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1723 	    zoneid);
1724 	if (ret != 0) {
1725 		if (ret == ENOSPC) {
1726 			/*
1727 			 * The message is delivered but cannot take more,
1728 			 * stop further loopback traffic to this port
1729 			 */
1730 			RDS_DPRINTF3("rds_deliver_loopback_msg",
1731 			    "Port %d NO SPACE", recvport);
1732 			rds_stall_port(NULL, recvport, RDS_LOOPBACK);
1733 		} else {
1734 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1735 			    "port %d failed: %d", sendport, recvport, ret);
1736 			return (ret);
1737 		}
1738 	}
1739 
1740 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1741 	return (0);
1742 }
1743 
1744 static void
1745 rds_resend_messages(void *arg)
1746 {
1747 	rds_session_t	*sp = (rds_session_t *)arg;
1748 	rds_ep_t	*ep;
1749 	rds_bufpool_t	*spool;
1750 	rds_buf_t	*bp, *endp, *tmp;
1751 	ibt_send_wr_t	*wrp;
1752 	uint_t		nwr = 0, ix, jx;
1753 	int		ret;
1754 
1755 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1756 
1757 	ep = &sp->session_dataep;
1758 
1759 	spool = &ep->ep_sndpool;
1760 	mutex_enter(&spool->pool_lock);
1761 
1762 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1763 
1764 	if (ep->ep_lbufid == NULL) {
1765 		RDS_DPRINTF2("rds_resend_messages",
1766 		    "SP(%p) Remote session is cleaned up ", sp);
1767 		/*
1768 		 * The remote end cleaned up its session. There may be loss
1769 		 * of messages. Mark all buffers as acknowledged.
1770 		 */
1771 		tmp = spool->pool_tailp;
1772 	} else {
1773 		tmp = (rds_buf_t *)ep->ep_lbufid;
1774 		RDS_DPRINTF2("rds_resend_messages",
1775 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1776 	}
1777 
1778 	endp = spool->pool_tailp;
1779 	bp = spool->pool_headp;
1780 	jx = 0;
1781 	while ((bp != NULL) && (bp != tmp)) {
1782 		bp->buf_state = RDS_SNDBUF_FREE;
1783 		jx++;
1784 		bp = bp->buf_nextp;
1785 	}
1786 
1787 	if (bp == NULL) {
1788 		mutex_exit(&spool->pool_lock);
1789 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1790 		    "found in the list", tmp);
1791 
1792 		rw_enter(&sp->session_lock, RW_WRITER);
1793 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1794 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1795 		} else {
1796 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1797 			    "Expected State: %d", sp, sp->session_state,
1798 			    RDS_SESSION_STATE_CONNECTED);
1799 		}
1800 		sp->session_failover = 0;
1801 		rw_exit(&sp->session_lock);
1802 		return;
1803 	}
1804 
1805 	/* Found the match */
1806 	bp->buf_state = RDS_SNDBUF_FREE;
1807 	jx++;
1808 
1809 	spool->pool_tailp = bp;
1810 	bp = bp->buf_nextp;
1811 	spool->pool_tailp->buf_nextp = NULL;
1812 	nwr = spool->pool_nfree - jx;
1813 	spool->pool_nfree = jx;
1814 	mutex_exit(&spool->pool_lock);
1815 
1816 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1817 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1818 
1819 	if (bp) {
1820 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1821 		    KM_SLEEP);
1822 
1823 		while (nwr) {
1824 			jx = (nwr > 100) ? 100 : nwr;
1825 
1826 			tmp = bp;
1827 			for (ix = 0; ix < jx; ix++) {
1828 				bp->buf_state = RDS_SNDBUF_PENDING;
1829 				wrp[ix].wr_id = (uintptr_t)bp;
1830 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1831 				wrp[ix].wr_trans = IBT_RC_SRV;
1832 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1833 				wrp[ix].wr_nds = 1;
1834 				wrp[ix].wr_sgl = &bp->buf_ds;
1835 				bp = bp->buf_nextp;
1836 			}
1837 
1838 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1839 			if (ret != IBT_SUCCESS) {
1840 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1841 				    "failed: %d for % pkts", ep, ret, jx);
1842 				break;
1843 			}
1844 
1845 			mutex_enter(&spool->pool_lock);
1846 			spool->pool_nbusy += jx;
1847 			mutex_exit(&spool->pool_lock);
1848 
1849 			nwr -= jx;
1850 		}
1851 
1852 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1853 
1854 		if (nwr != 0) {
1855 
1856 			/*
1857 			 * An error while failover is in progress. Some WRs are
1858 			 * posted while other remain. If any of the posted WRs
1859 			 * complete in error then they would dispatch a taskq to
1860 			 * do a failover. Getting the session lock will prevent
1861 			 * the taskq to wait until we are done here.
1862 			 */
1863 			rw_enter(&sp->session_lock, RW_READER);
1864 
1865 			/*
1866 			 * Wait until all the previous WRs are completed and
1867 			 * then queue the remaining, otherwise the order of
1868 			 * the messages may change.
1869 			 */
1870 			(void) rds_is_sendq_empty(ep, 1);
1871 
1872 			/* free the remaining buffers */
1873 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1874 
1875 			rw_exit(&sp->session_lock);
1876 			return;
1877 		}
1878 	}
1879 
1880 	rw_enter(&sp->session_lock, RW_WRITER);
1881 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1882 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1883 	} else {
1884 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1885 		    "Expected State: %d", sp, sp->session_state,
1886 		    RDS_SESSION_STATE_CONNECTED);
1887 	}
1888 	sp->session_failover = 0;
1889 	rw_exit(&sp->session_lock);
1890 
1891 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1892 }
1893 
1894 /*
1895  * This is called when a channel is connected. Transition the session to
1896  * CONNECTED state iff both channels are connected.
1897  */
1898 void
1899 rds_session_active(rds_session_t *sp)
1900 {
1901 	rds_ep_t	*ep;
1902 	uint_t		failover;
1903 
1904 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1905 
1906 	rw_enter(&sp->session_lock, RW_READER);
1907 
1908 	failover = sp->session_failover;
1909 
1910 	/*
1911 	 * we establish the data channel first, so check the control channel
1912 	 * first but make sure it is initialized.
1913 	 */
1914 	ep = &sp->session_ctrlep;
1915 	mutex_enter(&ep->ep_lock);
1916 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1917 		/* the session is not ready yet */
1918 		mutex_exit(&ep->ep_lock);
1919 		rw_exit(&sp->session_lock);
1920 		return;
1921 	}
1922 	mutex_exit(&ep->ep_lock);
1923 
1924 	/* control channel is connected, check the data channel */
1925 	ep = &sp->session_dataep;
1926 	mutex_enter(&ep->ep_lock);
1927 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1928 		/* data channel is not yet connected */
1929 		mutex_exit(&ep->ep_lock);
1930 		rw_exit(&sp->session_lock);
1931 		return;
1932 	}
1933 	mutex_exit(&ep->ep_lock);
1934 
1935 	if (failover) {
1936 		rw_exit(&sp->session_lock);
1937 
1938 		/*
1939 		 * The session has failed over. Previous msgs have to be
1940 		 * re-sent before the session is moved to the connected
1941 		 * state.
1942 		 */
1943 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1944 		    "to re-send messages", sp);
1945 		(void) ddi_taskq_dispatch(rds_taskq,
1946 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1947 		return;
1948 	}
1949 
1950 	/* the session is ready */
1951 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1952 	RDS_DPRINTF3("rds_session_active",
1953 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1954 
1955 	rw_exit(&sp->session_lock);
1956 
1957 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1958 }
1959 
1960 static int
1961 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1962     in_port_t recvport)
1963 {
1964 	int	ret;
1965 
1966 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1967 	    "%d", ep, sendport, recvport);
1968 
1969 	/* make sure the remote port is not stalled */
1970 	if (rds_is_port_marked(ep->ep_sp, recvport, RDS_REMOTE)) {
1971 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1972 		    ep->ep_sp, recvport);
1973 		RDS_INCR_EWOULDBLOCK();
1974 		ret = ENOMEM;
1975 	} else {
1976 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1977 	}
1978 
1979 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1980 
1981 	return (ret);
1982 }
1983 
1984 /* Send a message to a destination socket */
1985 int
1986 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1987     in_port_t recvport, zoneid_t zoneid)
1988 {
1989 	rds_session_t	*sp;
1990 	ib_gid_t	lgid, rgid;
1991 	int		ret;
1992 
1993 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1994 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1995 	    sendport, recvport);
1996 
1997 	/* If msg length is 0, just return success */
1998 	if (uiop->uio_resid == 0) {
1999 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
2000 		return (0);
2001 	}
2002 
2003 	/* Is there a session to the destination? */
2004 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
2005 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
2006 	rw_exit(&rdsib_statep->rds_sessionlock);
2007 
2008 	/* Is this a loopback message? */
2009 	if ((sp == NULL) && (rds_islocal(recvip))) {
2010 		/* make sure the port is not stalled */
2011 		if (rds_is_port_marked(NULL, recvport, RDS_LOOPBACK)) {
2012 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
2013 			    recvport);
2014 			RDS_INCR_EWOULDBLOCK();
2015 			return (ENOMEM);
2016 		}
2017 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
2018 		    sendport, zoneid);
2019 		return (ret);
2020 	}
2021 
2022 	/* Not a loopback message */
2023 	if (sp == NULL) {
2024 		/* There is no session to the destination, create one. */
2025 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
2026 		    "IP: 0x%x", recvip);
2027 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
2028 		    RDS_SESSION_ACTIVE);
2029 		if (sp != NULL) {
2030 			rw_enter(&sp->session_lock, RW_WRITER);
2031 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2032 				ret = rds_session_init(sp);
2033 				if (ret != 0) {
2034 					RDS_DPRINTF2("rds_sendmsg",
2035 					    "SP(%p): rds_session_init failed",
2036 					    sp);
2037 					sp->session_state =
2038 					    RDS_SESSION_STATE_FAILED;
2039 					RDS_DPRINTF3("rds_sendmsg",
2040 					    "SP(%p) State "
2041 					    "RDS_SESSION_STATE_FAILED", sp);
2042 					rw_exit(&sp->session_lock);
2043 					return (EFAULT);
2044 				}
2045 				sp->session_state = RDS_SESSION_STATE_INIT;
2046 				RDS_DPRINTF3("rds_sendmsg",
2047 				    "SP(%p) State "
2048 				    "RDS_SESSION_STATE_INIT", sp);
2049 				rw_exit(&sp->session_lock);
2050 				rds_session_open(sp);
2051 			} else {
2052 				rw_exit(&sp->session_lock);
2053 			}
2054 		} else {
2055 			/* Is a session created for this destination */
2056 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
2057 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
2058 			rw_exit(&rdsib_statep->rds_sessionlock);
2059 			if (sp == NULL) {
2060 				return (EFAULT);
2061 			}
2062 		}
2063 	}
2064 
2065 	/* There is a session to the destination */
2066 	rw_enter(&sp->session_lock, RW_READER);
2067 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2068 		rw_exit(&sp->session_lock);
2069 
2070 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2071 		    recvport);
2072 		return (ret);
2073 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2074 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2075 		ipaddr_t sendip1, recvip1;
2076 
2077 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
2078 		    "%d", sp, sp->session_state);
2079 		rw_exit(&sp->session_lock);
2080 		rw_enter(&sp->session_lock, RW_WRITER);
2081 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2082 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2083 			ibt_ip_path_attr_t	ipattr;
2084 			ibt_ip_addr_t		dstip;
2085 
2086 			sp->session_state = RDS_SESSION_STATE_CREATED;
2087 			sp->session_type = RDS_SESSION_ACTIVE;
2088 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
2089 			    "RDS_SESSION_STATE_CREATED", sp);
2090 			rw_exit(&sp->session_lock);
2091 
2092 
2093 			/* The ipaddr should be in the network order */
2094 			sendip1 = sendip;
2095 			recvip1 = recvip;
2096 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
2097 			if (ret == 0) {
2098 				RDS_DPRINTF2(LABEL, "Path not found "
2099 				    "(0x%x 0x%x)", sendip1, recvip1);
2100 			}
2101 
2102 			/* Resolve the IP addresses */
2103 			lgid.gid_prefix = 0;
2104 			lgid.gid_guid = 0;
2105 			rgid.gid_prefix = 0;
2106 			rgid.gid_guid = 0;
2107 
2108 			bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
2109 			dstip.family = AF_INET;
2110 			dstip.un.ip4addr = recvip1;
2111 			ipattr.ipa_dst_ip = &dstip;
2112 			ipattr.ipa_src_ip.family = AF_INET;
2113 			ipattr.ipa_src_ip.un.ip4addr = sendip1;
2114 			ipattr.ipa_ndst = 1;
2115 			ipattr.ipa_max_paths = 1;
2116 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
2117 			    sendip1, recvip1);
2118 			ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
2119 			    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo,
2120 			    NULL, NULL);
2121 			if (ret != IBT_SUCCESS) {
2122 				RDS_DPRINTF2("rds_sendmsg",
2123 				    "ibt_get_ip_paths failed, ret: %d ", ret);
2124 
2125 				rw_enter(&sp->session_lock, RW_WRITER);
2126 				if (sp->session_type == RDS_SESSION_ACTIVE) {
2127 					sp->session_state =
2128 					    RDS_SESSION_STATE_FAILED;
2129 					RDS_DPRINTF3("rds_sendmsg",
2130 					    "SP(%p) State "
2131 					    "RDS_SESSION_STATE_FAILED", sp);
2132 					rw_exit(&sp->session_lock);
2133 					return (EFAULT);
2134 				} else {
2135 					rw_exit(&sp->session_lock);
2136 					return (ENOMEM);
2137 				}
2138 			}
2139 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
2140 			lgid = sp->session_pinfo.
2141 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
2142 			rgid = sp->session_pinfo.
2143 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
2144 
2145 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
2146 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
2147 			    rgid.gid_guid);
2148 
2149 			rw_enter(&sp->session_lock, RW_WRITER);
2150 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2151 				sp->session_lgid = lgid;
2152 				sp->session_rgid = rgid;
2153 				ret = rds_session_init(sp);
2154 				if (ret != 0) {
2155 					RDS_DPRINTF2("rds_sendmsg",
2156 					    "SP(%p): rds_session_init failed",
2157 					    sp);
2158 					sp->session_state =
2159 					    RDS_SESSION_STATE_FAILED;
2160 					RDS_DPRINTF3("rds_sendmsg",
2161 					    "SP(%p) State "
2162 					    "RDS_SESSION_STATE_FAILED", sp);
2163 					rw_exit(&sp->session_lock);
2164 					return (EFAULT);
2165 				}
2166 				sp->session_state = RDS_SESSION_STATE_INIT;
2167 				rw_exit(&sp->session_lock);
2168 
2169 				rds_session_open(sp);
2170 
2171 			} else {
2172 				RDS_DPRINTF2("rds_sendmsg",
2173 				    "SP(%p): type changed to %d",
2174 				    sp, sp->session_type);
2175 				rw_exit(&sp->session_lock);
2176 				return (ENOMEM);
2177 			}
2178 		} else {
2179 			RDS_DPRINTF2("rds_sendmsg",
2180 			    "SP(%p): Session state %d changed",
2181 			    sp, sp->session_state);
2182 			rw_exit(&sp->session_lock);
2183 			return (ENOMEM);
2184 		}
2185 	} else {
2186 		RDS_DPRINTF4("rds_sendmsg", "SP(%p): Session is in %d state",
2187 		    sp, sp->session_state);
2188 		rw_exit(&sp->session_lock);
2189 		return (ENOMEM);
2190 	}
2191 
2192 	rw_enter(&sp->session_lock, RW_READER);
2193 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2194 		rw_exit(&sp->session_lock);
2195 
2196 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2197 		    recvport);
2198 	} else {
2199 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): state(%d) not connected",
2200 		    sp, sp->session_state);
2201 		rw_exit(&sp->session_lock);
2202 	}
2203 
2204 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
2205 
2206 	return (ret);
2207 }
2208 
2209 /* Note: This is called on the CQ handler thread */
2210 void
2211 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
2212 {
2213 	mblk_t		*mp, *mp1;
2214 	rds_data_hdr_t	*pktp, *pktp1;
2215 	uint8_t		*datap;
2216 	rds_buf_t	*bp1;
2217 	rds_bufpool_t	*rpool;
2218 	uint_t		npkts, ix;
2219 	int		ret;
2220 
2221 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
2222 
2223 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
2224 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
2225 	npkts = pktp->dh_npkts;
2226 
2227 	/* increment rx pending here */
2228 	rpool = &ep->ep_rcvpool;
2229 	mutex_enter(&rpool->pool_lock);
2230 	rpool->pool_nbusy += npkts;
2231 	mutex_exit(&rpool->pool_lock);
2232 
2233 	/* this will get freed by sockfs */
2234 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
2235 	if (mp == NULL) {
2236 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2237 		    ep, bp);
2238 		rds_free_recv_buf(bp, npkts);
2239 		return;
2240 	}
2241 	mp->b_wptr = datap + pktp->dh_datalen;
2242 	mp->b_datap->db_type = M_DATA;
2243 
2244 	mp1 = mp;
2245 	bp1 = bp->buf_nextp;
2246 	while (bp1 != NULL) {
2247 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
2248 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
2249 		    RDS_DATA_HDR_SZ;
2250 
2251 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
2252 		    BPRI_HI, &bp1->buf_frtn);
2253 		if (mp1->b_cont == NULL) {
2254 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2255 			    ep, bp1);
2256 			freemsg(mp);
2257 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
2258 			return;
2259 		}
2260 		mp1 = mp1->b_cont;
2261 		mp1->b_wptr = datap + pktp1->dh_datalen;
2262 		mp1->b_datap->db_type = M_DATA;
2263 
2264 		bp1 = bp1->buf_nextp;
2265 	}
2266 
2267 	RDS_INCR_RXPKTS_PEND(npkts);
2268 	RDS_INCR_RXPKTS(npkts);
2269 	RDS_INCR_RXBYTES(msgdsize(mp));
2270 
2271 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
2272 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
2273 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
2274 	    npkts, pktp->dh_psn);
2275 
2276 	/* store the last buffer id, no lock needed */
2277 	if (npkts > 1) {
2278 		ep->ep_rbufid = pktp1->dh_bufid;
2279 	} else {
2280 		ep->ep_rbufid = pktp->dh_bufid;
2281 	}
2282 
2283 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
2284 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
2285 	if (ret != 0) {
2286 		if (ret == ENOSPC) {
2287 			/*
2288 			 * The message is delivered but cannot take more,
2289 			 * stop further remote messages coming to this port
2290 			 */
2291 			RDS_DPRINTF3("rds_received_msg", "Port %d NO SPACE",
2292 			    pktp->dh_recvport);
2293 			rds_stall_port(ep->ep_sp, pktp->dh_recvport, RDS_LOCAL);
2294 		} else {
2295 			RDS_DPRINTF2(LABEL, "rds_deliver_new_msg returned: %d",
2296 			    ret);
2297 		}
2298 	}
2299 
2300 	mutex_enter(&ep->ep_lock);
2301 	/* The first message can come in before the conn est event */
2302 	if ((ep->ep_rdmacnt == 0) && (ep->ep_state == RDS_EP_STATE_CONNECTED)) {
2303 		ep->ep_rdmacnt++;
2304 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
2305 		mutex_exit(&ep->ep_lock);
2306 
2307 		/* send acknowledgement */
2308 		RDS_INCR_TXACKS();
2309 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
2310 		if (ret != IBT_SUCCESS) {
2311 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send for "
2312 			    "acknowledgement failed: %d, SQ depth: %d",
2313 			    ep, ret, ep->ep_sndpool.pool_nbusy);
2314 			mutex_enter(&ep->ep_lock);
2315 			ep->ep_rdmacnt--;
2316 			mutex_exit(&ep->ep_lock);
2317 		}
2318 	} else {
2319 		/* no room to send acknowledgement */
2320 		mutex_exit(&ep->ep_lock);
2321 	}
2322 
2323 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
2324 }
2325