xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision eb2bd6624e082e367f66e2b0fdfe54c9b5d493af)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #pragma ident	"%Z%%M%	%I%	%E% SMI"
76 
77 #include <sys/stream.h>
78 #include <sys/ib/clients/rds/rdsib_cm.h>
79 #include <sys/ib/clients/rds/rdsib_ib.h>
80 #include <sys/ib/clients/rds/rdsib_buf.h>
81 #include <sys/ib/clients/rds/rdsib_ep.h>
82 #include <sys/ib/clients/rds/rds_kstat.h>
83 #include <sys/zone.h>
84 
85 #define	RDS_POLL_CQ_IN_2TICKS	1
86 
87 /*
88  * This File contains the endpoint related calls
89  */
90 
91 extern boolean_t rds_islocal(ipaddr_t addr);
92 extern uint_t rds_wc_signal;
93 
94 #define	RDS_LOOPBACK	0
95 #define	RDS_LOCAL	1
96 #define	RDS_REMOTE	2
97 
98 #define	IBT_IPADDR	1
99 
100 static uint8_t
101 rds_is_port_marked(rds_session_t *sp, in_port_t port, uint_t qualifier)
102 {
103 	uint8_t	ret;
104 
105 	switch (qualifier) {
106 	case RDS_LOOPBACK: /* loopback */
107 		rw_enter(&rds_loopback_portmap_lock, RW_READER);
108 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
109 		rw_exit(&rds_loopback_portmap_lock);
110 		break;
111 
112 	case RDS_LOCAL: /* Session local */
113 		ASSERT(sp != NULL);
114 		rw_enter(&sp->session_local_portmap_lock, RW_READER);
115 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
116 		rw_exit(&sp->session_local_portmap_lock);
117 		break;
118 
119 	case RDS_REMOTE: /* Session remote */
120 		ASSERT(sp != NULL);
121 		rw_enter(&sp->session_remote_portmap_lock, RW_READER);
122 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
123 		rw_exit(&sp->session_remote_portmap_lock);
124 		break;
125 	}
126 
127 	return (ret);
128 }
129 
130 static uint8_t
131 rds_check_n_mark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
132 {
133 	uint8_t	ret;
134 
135 	switch (qualifier) {
136 	case RDS_LOOPBACK: /* loopback */
137 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
138 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
139 		if (!ret) {
140 			/* port is not marked, mark it */
141 			rds_loopback_portmap[port/8] =
142 			    rds_loopback_portmap[port/8] | (1 << (port % 8));
143 		}
144 		rw_exit(&rds_loopback_portmap_lock);
145 		break;
146 
147 	case RDS_LOCAL: /* Session local */
148 		ASSERT(sp != NULL);
149 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
150 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
151 		if (!ret) {
152 			/* port is not marked, mark it */
153 			sp->session_local_portmap[port/8] =
154 			    sp->session_local_portmap[port/8] |
155 			    (1 << (port % 8));
156 		}
157 		rw_exit(&sp->session_local_portmap_lock);
158 		break;
159 
160 	case RDS_REMOTE: /* Session remote */
161 		ASSERT(sp != NULL);
162 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
163 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
164 		if (!ret) {
165 			/* port is not marked, mark it */
166 			sp->session_remote_portmap[port/8] =
167 			    sp->session_remote_portmap[port/8] |
168 			    (1 << (port % 8));
169 		}
170 		rw_exit(&sp->session_remote_portmap_lock);
171 		break;
172 	}
173 
174 	return (ret);
175 }
176 
177 static uint8_t
178 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
179 {
180 	uint8_t	ret;
181 
182 	switch (qualifier) {
183 	case RDS_LOOPBACK: /* loopback */
184 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
185 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
186 		if (ret) {
187 			/* port is marked, unmark it */
188 			rds_loopback_portmap[port/8] =
189 			    rds_loopback_portmap[port/8] & ~(1 << (port % 8));
190 		}
191 		rw_exit(&rds_loopback_portmap_lock);
192 		break;
193 
194 	case RDS_LOCAL: /* Session local */
195 		ASSERT(sp != NULL);
196 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
197 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
198 		if (ret) {
199 			/* port is marked, unmark it */
200 			sp->session_local_portmap[port/8] =
201 			    sp->session_local_portmap[port/8] &
202 			    ~(1 << (port % 8));
203 		}
204 		rw_exit(&sp->session_local_portmap_lock);
205 		break;
206 
207 	case RDS_REMOTE: /* Session remote */
208 		ASSERT(sp != NULL);
209 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
210 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
211 		if (ret) {
212 			/* port is marked, unmark it */
213 			sp->session_remote_portmap[port/8] =
214 			    sp->session_remote_portmap[port/8] &
215 			    ~(1 << (port % 8));
216 		}
217 		rw_exit(&sp->session_remote_portmap_lock);
218 		break;
219 	}
220 
221 	return (ret);
222 }
223 
224 static void
225 rds_mark_all_ports(rds_session_t *sp, uint_t qualifier)
226 {
227 	switch (qualifier) {
228 	case RDS_LOOPBACK: /* loopback */
229 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
230 		(void) memset(rds_loopback_portmap, 0xFF, RDS_PORT_MAP_SIZE);
231 		rw_exit(&rds_loopback_portmap_lock);
232 		break;
233 
234 	case RDS_LOCAL: /* Session local */
235 		ASSERT(sp != NULL);
236 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
237 		(void) memset(sp->session_local_portmap, 0xFF,
238 		    RDS_PORT_MAP_SIZE);
239 		rw_exit(&sp->session_local_portmap_lock);
240 		break;
241 
242 	case RDS_REMOTE: /* Session remote */
243 		ASSERT(sp != NULL);
244 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
245 		(void) memset(sp->session_remote_portmap, 0xFF,
246 		    RDS_PORT_MAP_SIZE);
247 		rw_exit(&sp->session_remote_portmap_lock);
248 		break;
249 	}
250 }
251 
252 static void
253 rds_unmark_all_ports(rds_session_t *sp, uint_t qualifier)
254 {
255 	switch (qualifier) {
256 	case RDS_LOOPBACK: /* loopback */
257 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
258 		bzero(rds_loopback_portmap, RDS_PORT_MAP_SIZE);
259 		rw_exit(&rds_loopback_portmap_lock);
260 		break;
261 
262 	case RDS_LOCAL: /* Session local */
263 		ASSERT(sp != NULL);
264 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
265 		bzero(sp->session_local_portmap, RDS_PORT_MAP_SIZE);
266 		rw_exit(&sp->session_local_portmap_lock);
267 		break;
268 
269 	case RDS_REMOTE: /* Session remote */
270 		ASSERT(sp != NULL);
271 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
272 		bzero(sp->session_remote_portmap, RDS_PORT_MAP_SIZE);
273 		rw_exit(&sp->session_remote_portmap_lock);
274 		break;
275 	}
276 }
277 
278 static void
279 rds_add_session(rds_session_t *sp, boolean_t locked)
280 {
281 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
282 
283 	if (!locked) {
284 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
285 	}
286 
287 	sp->session_nextp = rdsib_statep->rds_sessionlistp;
288 	rdsib_statep->rds_sessionlistp = sp;
289 	rdsib_statep->rds_nsessions++;
290 
291 	if (!locked) {
292 		rw_exit(&rdsib_statep->rds_sessionlock);
293 	}
294 	RDS_INCR_SESS();
295 
296 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
297 }
298 
299 /* Session lookup based on destination IP or destination node guid */
300 rds_session_t *
301 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
302 {
303 	rds_session_t	*sp;
304 
305 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
306 	    remoteip, node_guid);
307 
308 	/* A read/write lock is expected, will panic if none of them are held */
309 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
310 	sp = statep->rds_sessionlistp;
311 	while (sp) {
312 		if ((sp->session_remip == remoteip) || ((node_guid != 0) &&
313 		    (sp->session_rgid.gid_guid == node_guid))) {
314 			break;
315 		}
316 
317 		sp = sp->session_nextp;
318 	}
319 
320 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
321 
322 	return (sp);
323 }
324 
325 boolean_t
326 rds_session_lkup_by_sp(rds_session_t *sp)
327 {
328 	rds_session_t *sessionp;
329 
330 	RDS_DPRINTF4("rds_session_lkup_by_sp", "Enter: 0x%p", sp);
331 
332 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
333 	sessionp = rdsib_statep->rds_sessionlistp;
334 	while (sessionp) {
335 		if (sessionp == sp) {
336 			rw_exit(&rdsib_statep->rds_sessionlock);
337 			return (B_TRUE);
338 		}
339 
340 		sessionp = sessionp->session_nextp;
341 	}
342 	rw_exit(&rdsib_statep->rds_sessionlock);
343 
344 	return (B_FALSE);
345 }
346 
347 static void
348 rds_ep_fini(rds_ep_t *ep)
349 {
350 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
351 
352 	/* free send pool */
353 	rds_free_send_pool(ep);
354 
355 	/* free recv pool */
356 	rds_free_recv_pool(ep);
357 
358 	mutex_enter(&ep->ep_lock);
359 	ep->ep_hca_guid = 0;
360 	mutex_exit(&ep->ep_lock);
361 
362 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
363 }
364 
365 /* Assumes SP write lock is held */
366 int
367 rds_ep_init(rds_ep_t *ep, ib_guid_t hca_guid)
368 {
369 	uint_t		ret;
370 
371 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
372 
373 	/* send pool */
374 	ret = rds_init_send_pool(ep, hca_guid);
375 	if (ret != 0) {
376 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
377 		    ep, ret);
378 		return (-1);
379 	}
380 
381 	/* recv pool */
382 	ret = rds_init_recv_pool(ep);
383 	if (ret != 0) {
384 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
385 		    ep, ret);
386 		rds_free_send_pool(ep);
387 		return (-1);
388 	}
389 
390 	/* reset the ep state */
391 	mutex_enter(&ep->ep_lock);
392 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
393 	ep->ep_hca_guid = hca_guid;
394 	ep->ep_lbufid = NULL;
395 	ep->ep_rbufid = NULL;
396 	ep->ep_segfbp = NULL;
397 	ep->ep_seglbp = NULL;
398 
399 	/* Initialize the WR to send acknowledgements */
400 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
401 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
402 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
403 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
404 	ep->ep_ackwr.wr_nds = 1;
405 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
406 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
407 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
408 	mutex_exit(&ep->ep_lock);
409 
410 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
411 
412 	return (0);
413 }
414 
415 static int
416 rds_ep_reinit(rds_ep_t *ep, ib_guid_t hca_guid)
417 {
418 	int	ret;
419 
420 	RDS_DPRINTF3("rds_ep_reinit", "Enter: EP(%p) Type: %d",
421 	    ep, ep->ep_type);
422 
423 	/* Re-initialize send pool */
424 	ret = rds_reinit_send_pool(ep, hca_guid);
425 	if (ret != 0) {
426 		RDS_DPRINTF2("rds_ep_reinit",
427 		    "EP(%p): rds_reinit_send_pool failed: %d", ep, ret);
428 		return (-1);
429 	}
430 
431 	/* free all the receive buffers in the pool */
432 	rds_free_recv_pool(ep);
433 
434 	RDS_DPRINTF3("rds_ep_reinit", "Return: EP(%p) Type: %d",
435 	    ep, ep->ep_type);
436 
437 	return (0);
438 }
439 
440 void
441 rds_session_fini(rds_session_t *sp)
442 {
443 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
444 
445 	rds_ep_fini(&sp->session_dataep);
446 	rds_ep_fini(&sp->session_ctrlep);
447 
448 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
449 }
450 
451 /*
452  * Allocate and initialize the resources needed for the control and
453  * data channels
454  */
455 int
456 rds_session_init(rds_session_t *sp)
457 {
458 	int		ret;
459 	rds_hca_t	*hcap;
460 	ib_guid_t	hca_guid;
461 
462 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
463 
464 	/* CALLED WITH SESSION WRITE LOCK */
465 
466 	hcap = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
467 	if (hcap == NULL) {
468 		RDS_DPRINTF1("rds_session_init", "SGID is on an uninitialized "
469 		    "HCA: %llx", sp->session_lgid.gid_guid);
470 		return (-1);
471 	}
472 
473 	hca_guid = hcap->hca_guid;
474 
475 	/* allocate and initialize the ctrl channel */
476 	ret = rds_ep_init(&sp->session_ctrlep, hca_guid);
477 	if (ret != 0) {
478 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
479 		    "failed", sp, &sp->session_ctrlep);
480 		return (-1);
481 	}
482 
483 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
484 
485 	/* allocate and initialize the data channel */
486 	ret = rds_ep_init(&sp->session_dataep, hca_guid);
487 	if (ret != 0) {
488 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
489 		    "failed", sp, &sp->session_dataep);
490 		rds_ep_fini(&sp->session_ctrlep);
491 		return (-1);
492 	}
493 
494 	/* Clear the portmaps */
495 	rds_unmark_all_ports(sp, RDS_LOCAL);
496 	rds_unmark_all_ports(sp, RDS_REMOTE);
497 
498 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
499 
500 	RDS_DPRINTF2("rds_session_init", "Return");
501 
502 	return (0);
503 }
504 
505 /*
506  * This should be called before moving a session from ERROR state to
507  * INIT state. This will update the HCA keys incase the session has moved from
508  * one HCA to another.
509  */
510 int
511 rds_session_reinit(rds_session_t *sp, ib_gid_t lgid)
512 {
513 	rds_hca_t	*hcap, *hcap1;
514 	int		ret;
515 
516 	RDS_DPRINTF2("rds_session_reinit", "Enter: SP(0x%p)", sp);
517 
518 	/* CALLED WITH SESSION WRITE LOCK */
519 
520 	/* Clear the portmaps */
521 	rds_unmark_all_ports(sp, RDS_LOCAL);
522 	rds_unmark_all_ports(sp, RDS_REMOTE);
523 
524 	/* make the last buffer as the acknowledged */
525 	*(uintptr_t *)sp->session_dataep.ep_ack_addr =
526 	    (uintptr_t)sp->session_dataep.ep_sndpool.pool_tailp;
527 
528 	hcap = rds_gid_to_hcap(rdsib_statep, lgid);
529 	if (hcap == NULL) {
530 		RDS_DPRINTF1("rds_session_reinit", "SGID is on an "
531 		    "uninitialized HCA: %llx", lgid.gid_guid);
532 		return (-1);
533 	}
534 
535 	hcap1 = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
536 	if (hcap1 == NULL) {
537 		RDS_DPRINTF1("rds_session_reinit", "Seems like HCA %llx "
538 		    "is unplugged", sp->session_lgid.gid_guid);
539 	} else if (hcap->hca_guid == hcap1->hca_guid) {
540 		/*
541 		 * No action is needed as the session did not move across
542 		 * HCAs
543 		 */
544 		RDS_DPRINTF2("rds_session_reinit", "Failover on the same HCA");
545 		return (0);
546 	}
547 
548 	RDS_DPRINTF2("rds_session_reinit", "Failover across HCAs");
549 
550 	/* re-initialize the control channel */
551 	ret = rds_ep_reinit(&sp->session_ctrlep, hcap->hca_guid);
552 	if (ret != 0) {
553 		RDS_DPRINTF2("rds_session_reinit",
554 		    "SP(%p): Ctrl EP(%p) re-initialization failed",
555 		    sp, &sp->session_ctrlep);
556 		return (-1);
557 	}
558 
559 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Control EP(%p)",
560 	    sp, &sp->session_ctrlep);
561 
562 	/* re-initialize the data channel */
563 	ret = rds_ep_reinit(&sp->session_dataep, hcap->hca_guid);
564 	if (ret != 0) {
565 		RDS_DPRINTF2("rds_session_reinit",
566 		    "SP(%p): Data EP(%p) re-initialization failed",
567 		    sp, &sp->session_dataep);
568 		return (-1);
569 	}
570 
571 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Data EP(%p)",
572 	    sp, &sp->session_dataep);
573 
574 	sp->session_lgid = lgid;
575 
576 	RDS_DPRINTF2("rds_session_reinit", "Return: SP(0x%p)", sp);
577 
578 	return (0);
579 }
580 
581 static int
582 rds_session_connect(rds_session_t *sp)
583 {
584 	ibt_channel_hdl_t	ctrlchan, datachan;
585 	rds_ep_t		*ep;
586 	int			ret;
587 
588 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
589 
590 	sp->session_pinfo.pi_sid = rdsib_statep->rds_service_id;
591 
592 	/* Override the packet life time based on the conf file */
593 	if (IBPktLifeTime != 0) {
594 		sp->session_pinfo.pi_prim_cep_path.cep_cm_opaque1 =
595 		    IBPktLifeTime;
596 	}
597 
598 	/* Session type may change if we run into peer-to-peer case. */
599 	rw_enter(&sp->session_lock, RW_READER);
600 	if (sp->session_type == RDS_SESSION_PASSIVE) {
601 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
602 		    "active end", sp);
603 		rw_exit(&sp->session_lock);
604 		return (0); /* return success */
605 	}
606 	rw_exit(&sp->session_lock);
607 
608 	/* connect the data ep first */
609 	ep = &sp->session_dataep;
610 	mutex_enter(&ep->ep_lock);
611 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
612 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
613 		mutex_exit(&ep->ep_lock);
614 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
615 		    &datachan);
616 		if (ret != IBT_SUCCESS) {
617 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
618 			    "failed: %d", ep, ret);
619 			return (-1);
620 		}
621 		sp->session_dataep.ep_chanhdl = datachan;
622 	} else {
623 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
624 		    "unexpected state: %d", sp, ep, ep->ep_state);
625 		mutex_exit(&ep->ep_lock);
626 		return (-1);
627 	}
628 
629 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
630 	    sp, ep);
631 
632 	ep = &sp->session_ctrlep;
633 	mutex_enter(&ep->ep_lock);
634 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
635 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
636 		mutex_exit(&ep->ep_lock);
637 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
638 		    &ctrlchan);
639 		if (ret != IBT_SUCCESS) {
640 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
641 			    "failed: %d", ep, ret);
642 			return (-1);
643 		}
644 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
645 	} else {
646 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
647 		    "unexpected state: %d", sp, ep, ep->ep_state);
648 		mutex_exit(&ep->ep_lock);
649 		return (-1);
650 	}
651 
652 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
653 	    sp, sp->session_myip, sp->session_remip);
654 
655 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
656 
657 	return (0);
658 }
659 
660 /*
661  * Can be called with or without session_lock.
662  */
663 void
664 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
665 {
666 	rds_ep_t		*ep;
667 
668 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
669 	    sp->session_state);
670 
671 	ep = &sp->session_dataep;
672 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
673 
674 	/* wait until the SQ is empty before closing */
675 	if (wait != 0) {
676 		(void) rds_is_sendq_empty(ep, wait);
677 	}
678 
679 	mutex_enter(&ep->ep_lock);
680 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
681 		mutex_exit(&ep->ep_lock);
682 		delay(drv_usectohz(300000));
683 		mutex_enter(&ep->ep_lock);
684 	}
685 
686 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
687 		ep->ep_state = RDS_EP_STATE_CLOSING;
688 		mutex_exit(&ep->ep_lock);
689 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
690 		if (wait == 0) {
691 			/* make sure all WCs are flushed before proceeding */
692 			(void) rds_is_sendq_empty(ep, 1);
693 		}
694 		mutex_enter(&ep->ep_lock);
695 	}
696 	rds_ep_free_rc_channel(ep);
697 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
698 	ep->ep_segfbp = NULL;
699 	ep->ep_seglbp = NULL;
700 	mutex_exit(&ep->ep_lock);
701 
702 	ep = &sp->session_ctrlep;
703 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
704 
705 	/* wait until the SQ is empty before closing */
706 	if (wait != 0) {
707 		(void) rds_is_sendq_empty(ep, wait);
708 	}
709 
710 	mutex_enter(&ep->ep_lock);
711 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
712 		mutex_exit(&ep->ep_lock);
713 		delay(drv_usectohz(300000));
714 		mutex_enter(&ep->ep_lock);
715 	}
716 
717 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
718 		ep->ep_state = RDS_EP_STATE_CLOSING;
719 		mutex_exit(&ep->ep_lock);
720 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
721 		if (wait == 0) {
722 			/* make sure all WCs are flushed before proceeding */
723 			(void) rds_is_sendq_empty(ep, 1);
724 		}
725 		mutex_enter(&ep->ep_lock);
726 	}
727 	rds_ep_free_rc_channel(ep);
728 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
729 	ep->ep_segfbp = NULL;
730 	ep->ep_seglbp = NULL;
731 	mutex_exit(&ep->ep_lock);
732 
733 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
734 }
735 
736 /* Free the session */
737 static void
738 rds_destroy_session(rds_session_t *sp)
739 {
740 	rds_ep_t	*ep;
741 	rds_bufpool_t	*pool;
742 
743 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
744 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
745 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
746 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
747 
748 	rw_enter(&sp->session_lock, RW_READER);
749 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
750 	    sp->session_state);
751 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
752 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
753 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
754 		rw_exit(&sp->session_lock);
755 		delay(drv_usectohz(1000000));
756 		rw_enter(&sp->session_lock, RW_READER);
757 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
758 		    "ON SESSION", sp, sp->session_state);
759 	}
760 	rw_exit(&sp->session_lock);
761 
762 	/* data channel */
763 	ep = &sp->session_dataep;
764 
765 	/* send pool locks */
766 	pool = &ep->ep_sndpool;
767 	cv_destroy(&pool->pool_cv);
768 	mutex_destroy(&pool->pool_lock);
769 
770 	/* recv pool locks */
771 	pool = &ep->ep_rcvpool;
772 	cv_destroy(&pool->pool_cv);
773 	mutex_destroy(&pool->pool_lock);
774 	mutex_destroy(&ep->ep_recvqp.qp_lock);
775 
776 	/* control channel */
777 	ep = &sp->session_ctrlep;
778 
779 	/* send pool locks */
780 	pool = &ep->ep_sndpool;
781 	cv_destroy(&pool->pool_cv);
782 	mutex_destroy(&pool->pool_lock);
783 
784 	/* recv pool locks */
785 	pool = &ep->ep_rcvpool;
786 	cv_destroy(&pool->pool_cv);
787 	mutex_destroy(&pool->pool_lock);
788 	mutex_destroy(&ep->ep_recvqp.qp_lock);
789 
790 	/* session */
791 	rw_destroy(&sp->session_lock);
792 	rw_destroy(&sp->session_local_portmap_lock);
793 	rw_destroy(&sp->session_remote_portmap_lock);
794 
795 	/* free the session */
796 	kmem_free(sp, sizeof (rds_session_t));
797 
798 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
799 }
800 
801 /* This is called on the taskq thread */
802 static void
803 rds_failover_session(void *arg)
804 {
805 	rds_session_t	*sp = (rds_session_t *)arg;
806 	ib_gid_t	lgid, rgid;
807 	ipaddr_t	myip, remip;
808 	int		ret, cnt = 0;
809 
810 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
811 
812 	/* Make sure the session is still alive */
813 	if (rds_session_lkup_by_sp(sp) == B_FALSE) {
814 		RDS_DPRINTF2("rds_failover_session",
815 		    "Return: SP(%p) not ALIVE", sp);
816 		return;
817 	}
818 
819 	RDS_INCR_FAILOVERS();
820 
821 	rw_enter(&sp->session_lock, RW_WRITER);
822 	if (sp->session_type != RDS_SESSION_ACTIVE) {
823 		/*
824 		 * The remote side must have seen the error and initiated
825 		 * a re-connect.
826 		 */
827 		RDS_DPRINTF2("rds_failover_session",
828 		    "SP(%p) has become passive", sp);
829 		rw_exit(&sp->session_lock);
830 		return;
831 	}
832 	sp->session_failover = 1;
833 	rw_exit(&sp->session_lock);
834 
835 	/*
836 	 * The session is in ERROR state but close both channels
837 	 * for a clean start.
838 	 */
839 	rds_session_close(sp, IBT_BLOCKING, 1);
840 
841 	/* wait 1 sec before re-connecting */
842 	delay(drv_usectohz(1000000));
843 
844 	do {
845 		ibt_ip_path_attr_t	ipattr;
846 		ibt_ip_addr_t		dstip;
847 
848 		/* The ipaddr should be in the network order */
849 		myip = sp->session_myip;
850 		remip = sp->session_remip;
851 		ret = rds_sc_path_lookup(&myip, &remip);
852 		if (ret == 0) {
853 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
854 			    myip, remip);
855 		}
856 		/* check if we have (new) path from the source to destination */
857 		lgid.gid_prefix = 0;
858 		lgid.gid_guid = 0;
859 		rgid.gid_prefix = 0;
860 		rgid.gid_guid = 0;
861 
862 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
863 		dstip.family = AF_INET;
864 		dstip.un.ip4addr = htonl(remip);
865 		ipattr.ipa_dst_ip = &dstip;
866 		ipattr.ipa_src_ip.family = AF_INET;
867 		ipattr.ipa_src_ip.un.ip4addr = htonl(myip);
868 		ipattr.ipa_ndst = 1;
869 		ipattr.ipa_max_paths = 1;
870 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
871 		    myip, remip);
872 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
873 		    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo, NULL, NULL);
874 		if (ret == IBT_SUCCESS) {
875 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
876 			lgid = sp->session_pinfo.
877 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
878 			rgid = sp->session_pinfo.
879 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
880 			break;
881 		}
882 
883 		RDS_DPRINTF1(LABEL, "ibt_get_ip_paths failed, ret: %d ", ret);
884 
885 		/* wait 1 sec before re-trying */
886 		delay(drv_usectohz(1000000));
887 		cnt++;
888 	} while (cnt < 5);
889 
890 	if (ret != IBT_SUCCESS) {
891 		rw_enter(&sp->session_lock, RW_WRITER);
892 		if (sp->session_type == RDS_SESSION_ACTIVE) {
893 			rds_session_fini(sp);
894 			sp->session_state = RDS_SESSION_STATE_FAILED;
895 			sp->session_failover = 0;
896 			RDS_DPRINTF3("rds_failover_session",
897 			    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
898 		} else {
899 			RDS_DPRINTF2("rds_failover_session",
900 			    "SP(%p) has become passive", sp);
901 		}
902 		rw_exit(&sp->session_lock);
903 		return;
904 	}
905 
906 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
907 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
908 	    rgid.gid_guid);
909 
910 	rw_enter(&sp->session_lock, RW_WRITER);
911 	if (sp->session_type != RDS_SESSION_ACTIVE) {
912 		/*
913 		 * The remote side must have seen the error and initiated
914 		 * a re-connect.
915 		 */
916 		RDS_DPRINTF2("rds_failover_session",
917 		    "SP(%p) has become passive", sp);
918 		rw_exit(&sp->session_lock);
919 		return;
920 	}
921 
922 	/* move the session to init state */
923 	ret = rds_session_reinit(sp, lgid);
924 	sp->session_lgid = lgid;
925 	sp->session_rgid = rgid;
926 	if (ret != 0) {
927 		rds_session_fini(sp);
928 		sp->session_state = RDS_SESSION_STATE_FAILED;
929 		sp->session_failover = 0;
930 		RDS_DPRINTF3("rds_failover_session",
931 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
932 		rw_exit(&sp->session_lock);
933 		return;
934 	} else {
935 		sp->session_state = RDS_SESSION_STATE_INIT;
936 		RDS_DPRINTF3("rds_failover_session",
937 		    "SP(%p) State RDS_SESSION_STATE_INIT", sp);
938 	}
939 	rw_exit(&sp->session_lock);
940 
941 	rds_session_open(sp);
942 
943 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
944 }
945 
946 void
947 rds_handle_send_error(rds_ep_t *ep)
948 {
949 	if (rds_is_sendq_empty(ep, 0)) {
950 		/* Session should already be in ERROR, try to reconnect */
951 		RDS_DPRINTF2("rds_handle_send_error",
952 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
953 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
954 		    (void *)ep->ep_sp, DDI_SLEEP);
955 	}
956 }
957 
958 /*
959  * Called in the CM handler on the passive side
960  * Called on a taskq thread.
961  */
962 void
963 rds_cleanup_passive_session(void *arg)
964 {
965 	rds_session_t	*sp = arg;
966 
967 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
968 	    sp->session_state);
969 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
970 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
971 
972 	rds_session_close(sp, IBT_BLOCKING, 1);
973 
974 	rw_enter(&sp->session_lock, RW_WRITER);
975 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
976 		rds_session_fini(sp);
977 		sp->session_state = RDS_SESSION_STATE_FINI;
978 		sp->session_failover = 0;
979 		RDS_DPRINTF3("rds_cleanup_passive_session",
980 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
981 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
982 		rds_session_fini(sp);
983 		sp->session_state = RDS_SESSION_STATE_FAILED;
984 		sp->session_failover = 0;
985 		RDS_DPRINTF3("rds_cleanup_passive_session",
986 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
987 	}
988 	rw_exit(&sp->session_lock);
989 
990 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
991 }
992 
993 /*
994  * Called by the CM handler on the passive side
995  * Called with WRITE lock on the session
996  */
997 void
998 rds_passive_session_fini(rds_session_t *sp)
999 {
1000 	rds_ep_t	*ep;
1001 
1002 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
1003 	    sp->session_state);
1004 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
1005 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
1006 
1007 	/* clean the data channel */
1008 	ep = &sp->session_dataep;
1009 	(void) rds_is_sendq_empty(ep, 1);
1010 	mutex_enter(&ep->ep_lock);
1011 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1012 	    ep->ep_state);
1013 	rds_ep_free_rc_channel(ep);
1014 	mutex_exit(&ep->ep_lock);
1015 
1016 	/* clean the control channel */
1017 	ep = &sp->session_ctrlep;
1018 	(void) rds_is_sendq_empty(ep, 1);
1019 	mutex_enter(&ep->ep_lock);
1020 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1021 	    ep->ep_state);
1022 	rds_ep_free_rc_channel(ep);
1023 	mutex_exit(&ep->ep_lock);
1024 
1025 	rds_session_fini(sp);
1026 	sp->session_failover = 0;
1027 
1028 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
1029 }
1030 
1031 /*
1032  * Can be called:
1033  * 1. on driver detach
1034  * 2. on taskq thread
1035  * arg is always NULL
1036  */
1037 /* ARGSUSED */
1038 void
1039 rds_close_sessions(void *arg)
1040 {
1041 	rds_session_t *sp, *spnextp;
1042 
1043 	RDS_DPRINTF2("rds_close_sessions", "Enter");
1044 
1045 	/* wait until all the buffers are freed by the sockets */
1046 	while (RDS_GET_RXPKTS_PEND() != 0) {
1047 		/* wait one second and try again */
1048 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
1049 		    "pending packets", RDS_GET_RXPKTS_PEND());
1050 		delay(drv_usectohz(1000000));
1051 	}
1052 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
1053 
1054 	/* close all the sessions */
1055 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
1056 	sp = rdsib_statep->rds_sessionlistp;
1057 	while (sp) {
1058 		rw_enter(&sp->session_lock, RW_WRITER);
1059 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
1060 		    sp->session_state);
1061 
1062 		switch (sp->session_state) {
1063 		case RDS_SESSION_STATE_CONNECTED:
1064 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1065 			rw_exit(&sp->session_lock);
1066 
1067 			rds_session_close(sp, IBT_BLOCKING, 1);
1068 
1069 			rw_enter(&sp->session_lock, RW_WRITER);
1070 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1071 			RDS_DPRINTF3("rds_close_sessions",
1072 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1073 			rds_session_fini(sp);
1074 			sp->session_state = RDS_SESSION_STATE_FINI;
1075 			sp->session_failover = 0;
1076 			RDS_DPRINTF3("rds_close_sessions",
1077 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1078 			break;
1079 
1080 		case RDS_SESSION_STATE_ERROR:
1081 		case RDS_SESSION_STATE_PASSIVE_CLOSING:
1082 		case RDS_SESSION_STATE_INIT:
1083 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1084 			rw_exit(&sp->session_lock);
1085 
1086 			rds_session_close(sp, IBT_BLOCKING, 1);
1087 
1088 			rw_enter(&sp->session_lock, RW_WRITER);
1089 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1090 			RDS_DPRINTF3("rds_close_sessions",
1091 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1092 			/* FALLTHRU */
1093 		case RDS_SESSION_STATE_CLOSED:
1094 			rds_session_fini(sp);
1095 			sp->session_state = RDS_SESSION_STATE_FINI;
1096 			sp->session_failover = 0;
1097 			RDS_DPRINTF3("rds_close_sessions",
1098 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1099 			break;
1100 		}
1101 
1102 		rw_exit(&sp->session_lock);
1103 		sp = sp->session_nextp;
1104 	}
1105 
1106 	sp = rdsib_statep->rds_sessionlistp;
1107 	rdsib_statep->rds_sessionlistp = NULL;
1108 	rdsib_statep->rds_nsessions = 0;
1109 	rw_exit(&rdsib_statep->rds_sessionlock);
1110 
1111 	while (sp) {
1112 		spnextp = sp->session_nextp;
1113 		rds_destroy_session(sp);
1114 		RDS_DECR_SESS();
1115 		sp = spnextp;
1116 	}
1117 
1118 	/* free the global pool */
1119 	rds_free_recv_caches(rdsib_statep);
1120 
1121 	RDS_DPRINTF2("rds_close_sessions", "Return");
1122 }
1123 
1124 void
1125 rds_session_open(rds_session_t *sp)
1126 {
1127 	int		ret;
1128 
1129 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
1130 
1131 	ret = rds_session_connect(sp);
1132 	if (ret == -1) {
1133 		/*
1134 		 * may be the session has become passive due to
1135 		 * hitting peer-to-peer case
1136 		 */
1137 		rw_enter(&sp->session_lock, RW_READER);
1138 		if (sp->session_type == RDS_SESSION_PASSIVE) {
1139 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
1140 			    "has become passive from active", sp);
1141 			rw_exit(&sp->session_lock);
1142 			return;
1143 		}
1144 
1145 		/* get the lock for writing */
1146 		rw_exit(&sp->session_lock);
1147 		rw_enter(&sp->session_lock, RW_WRITER);
1148 		sp->session_state = RDS_SESSION_STATE_ERROR;
1149 		RDS_DPRINTF3("rds_session_open",
1150 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
1151 		rw_exit(&sp->session_lock);
1152 
1153 		/* Connect request failed */
1154 		rds_session_close(sp, IBT_BLOCKING, 1);
1155 
1156 		rw_enter(&sp->session_lock, RW_WRITER);
1157 		rds_session_fini(sp);
1158 		sp->session_state = RDS_SESSION_STATE_FAILED;
1159 		sp->session_failover = 0;
1160 		RDS_DPRINTF3("rds_session_open",
1161 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1162 		rw_exit(&sp->session_lock);
1163 
1164 		return;
1165 	}
1166 
1167 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
1168 }
1169 
1170 /*
1171  * Creates a session and inserts it into the list of sessions. The session
1172  * state would be CREATED.
1173  * Return Values:
1174  *	EWOULDBLOCK
1175  */
1176 rds_session_t *
1177 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
1178     ibt_cm_req_rcv_t *reqp, uint8_t type)
1179 {
1180 	ib_gid_t	lgid, rgid;
1181 	rds_session_t	*newp, *oldp;
1182 	rds_ep_t	*dataep, *ctrlep;
1183 	rds_bufpool_t	*pool;
1184 	int		ret;
1185 
1186 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x, type: %d",
1187 	    statep, localip, remip, type);
1188 
1189 	/* Allocate and initialize global buffer pool */
1190 	ret = rds_init_recv_caches(statep);
1191 	if (ret != 0) {
1192 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
1193 		return (NULL);
1194 	}
1195 
1196 	/* enough memory for session (includes 2 endpoints) */
1197 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
1198 
1199 	newp->session_remip = remip;
1200 	newp->session_myip = localip;
1201 	newp->session_type = type;
1202 	newp->session_state = RDS_SESSION_STATE_CREATED;
1203 	RDS_DPRINTF3("rds_session_create",
1204 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
1205 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
1206 	rw_init(&newp->session_local_portmap_lock, NULL, RW_DRIVER, NULL);
1207 	rw_init(&newp->session_remote_portmap_lock, NULL, RW_DRIVER, NULL);
1208 
1209 	/* Initialize data endpoint */
1210 	dataep = &newp->session_dataep;
1211 	dataep->ep_remip = newp->session_remip;
1212 	dataep->ep_myip = newp->session_myip;
1213 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
1214 	dataep->ep_sp = newp;
1215 	dataep->ep_type = RDS_EP_TYPE_DATA;
1216 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1217 
1218 	/* Initialize send pool locks */
1219 	pool = &dataep->ep_sndpool;
1220 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1221 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1222 
1223 	/* Initialize recv pool locks */
1224 	pool = &dataep->ep_rcvpool;
1225 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1226 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1227 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1228 
1229 	/* Initialize control endpoint */
1230 	ctrlep = &newp->session_ctrlep;
1231 	ctrlep->ep_remip = newp->session_remip;
1232 	ctrlep->ep_myip = newp->session_myip;
1233 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
1234 	ctrlep->ep_sp = newp;
1235 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
1236 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1237 
1238 	/* Initialize send pool locks */
1239 	pool = &ctrlep->ep_sndpool;
1240 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1241 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1242 
1243 	/* Initialize recv pool locks */
1244 	pool = &ctrlep->ep_rcvpool;
1245 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1246 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1247 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1248 
1249 	/* lkup if there is already a session */
1250 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
1251 	oldp = rds_session_lkup(statep, remip, 0);
1252 	if (oldp != NULL) {
1253 		/* A session to this destination exists */
1254 		rw_exit(&statep->rds_sessionlock);
1255 		rw_destroy(&newp->session_lock);
1256 		rw_destroy(&newp->session_local_portmap_lock);
1257 		rw_destroy(&newp->session_remote_portmap_lock);
1258 		mutex_destroy(&dataep->ep_lock);
1259 		mutex_destroy(&ctrlep->ep_lock);
1260 		kmem_free(newp, sizeof (rds_session_t));
1261 		return (NULL);
1262 	}
1263 
1264 	/* Insert this session into the list */
1265 	rds_add_session(newp, B_TRUE);
1266 
1267 	/* unlock the session list */
1268 	rw_exit(&statep->rds_sessionlock);
1269 
1270 	if (type == RDS_SESSION_ACTIVE) {
1271 		ipaddr_t		localip1, remip1;
1272 		ibt_ip_path_attr_t	ipattr;
1273 		ibt_ip_addr_t		dstip;
1274 
1275 		/* The ipaddr should be in the network order */
1276 		localip1 = localip;
1277 		remip1 = remip;
1278 		ret = rds_sc_path_lookup(&localip1, &remip1);
1279 		if (ret == 0) {
1280 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1281 			    localip, remip);
1282 		}
1283 
1284 		/* Get the gids for the source and destination ip addrs */
1285 		lgid.gid_prefix = 0;
1286 		lgid.gid_guid = 0;
1287 		rgid.gid_prefix = 0;
1288 		rgid.gid_guid = 0;
1289 
1290 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1291 		dstip.family = AF_INET;
1292 		dstip.un.ip4addr = ntohl(remip1);
1293 		ipattr.ipa_dst_ip = &dstip;
1294 		ipattr.ipa_src_ip.family = AF_INET;
1295 		ipattr.ipa_src_ip.un.ip4addr = ntohl(localip1);
1296 		ipattr.ipa_ndst = 1;
1297 		ipattr.ipa_max_paths = 1;
1298 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
1299 		    localip1, remip1);
1300 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
1301 		    IBT_PATH_NO_FLAGS, &ipattr, &newp->session_pinfo,
1302 		    NULL, NULL);
1303 		if (ret != IBT_SUCCESS) {
1304 			RDS_DPRINTF1(LABEL, "ibt_get_ip_paths failed, ret: %d "
1305 			    "lgid: %llx:%llx rgid: %llx:%llx", lgid.gid_prefix,
1306 			    lgid.gid_guid, rgid.gid_prefix, rgid.gid_guid);
1307 
1308 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1309 			return (NULL);
1310 		}
1311 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
1312 		lgid =
1313 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_sgid;
1314 		rgid =
1315 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_dgid;
1316 
1317 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1318 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1319 		    rgid.gid_guid);
1320 	}
1321 
1322 	rw_enter(&newp->session_lock, RW_WRITER);
1323 	/* check for peer-to-peer case */
1324 	if (type == newp->session_type) {
1325 		/* no peer-to-peer case */
1326 		if (type == RDS_SESSION_ACTIVE) {
1327 			newp->session_lgid = lgid;
1328 			newp->session_rgid = rgid;
1329 		} else {
1330 			/* rgid is requester gid & lgid is receiver gid */
1331 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1332 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1333 		}
1334 	}
1335 	rw_exit(&newp->session_lock);
1336 
1337 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1338 
1339 	return (newp);
1340 }
1341 
1342 void
1343 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1344 {
1345 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1346 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1347 
1348 	switch (cpkt->rcp_code) {
1349 	case RDS_CTRL_CODE_STALL:
1350 		RDS_INCR_STALLS_RCVD();
1351 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1352 		break;
1353 	case RDS_CTRL_CODE_UNSTALL:
1354 		RDS_INCR_UNSTALLS_RCVD();
1355 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1356 		break;
1357 	case RDS_CTRL_CODE_STALL_PORTS:
1358 		rds_mark_all_ports(sp, RDS_REMOTE);
1359 		break;
1360 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1361 		rds_unmark_all_ports(sp, RDS_REMOTE);
1362 		break;
1363 	case RDS_CTRL_CODE_HEARTBEAT:
1364 		break;
1365 	default:
1366 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1367 		    cpkt->rcp_code);
1368 		break;
1369 	}
1370 
1371 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1372 }
1373 
1374 int
1375 rds_post_control_message(rds_session_t *sp, uint8_t code, in_port_t port)
1376 {
1377 	ibt_send_wr_t	wr;
1378 	rds_ep_t	*ep;
1379 	rds_buf_t	*bp;
1380 	rds_ctrl_pkt_t	*cp;
1381 	int		ret;
1382 
1383 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1384 	    "Port: %d", sp, code, port);
1385 
1386 	ep = &sp->session_ctrlep;
1387 
1388 	bp = rds_get_send_buf(ep, 1);
1389 	if (bp == NULL) {
1390 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1391 		    "message: SP(%p) Code: %d Port: %d", sp, code,
1392 		    port);
1393 		return (-1);
1394 	}
1395 
1396 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1397 	cp->rcp_code = code;
1398 	cp->rcp_port = port;
1399 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1400 
1401 	wr.wr_id = (uintptr_t)bp;
1402 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1403 	wr.wr_trans = IBT_RC_SRV;
1404 	wr.wr_opcode = IBT_WRC_SEND;
1405 	wr.wr_nds = 1;
1406 	wr.wr_sgl = &bp->buf_ds;
1407 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1408 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1409 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1410 	if (ret != IBT_SUCCESS) {
1411 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1412 		    "%d", ep, ret);
1413 		bp->buf_state = RDS_SNDBUF_FREE;
1414 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1415 		return (-1);
1416 	}
1417 
1418 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1419 	    "Port: %d", sp, code, port);
1420 
1421 	return (0);
1422 }
1423 
1424 void
1425 rds_stall_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
1426 {
1427 	int		ret;
1428 
1429 	RDS_DPRINTF4("rds_stall_port", "Enter: SP(%p) Port %d", sp, port);
1430 
1431 	RDS_INCR_STALLS_TRIGGERED();
1432 
1433 	if (!rds_check_n_mark_port(sp, port, qualifier)) {
1434 
1435 		if (sp != NULL) {
1436 			ret = rds_post_control_message(sp,
1437 			    RDS_CTRL_CODE_STALL, port);
1438 			if (ret != 0) {
1439 				(void) rds_check_n_unmark_port(sp, port,
1440 				    qualifier);
1441 				return;
1442 			}
1443 			RDS_INCR_STALLS_SENT();
1444 		}
1445 	} else {
1446 		RDS_DPRINTF3(LABEL,
1447 		    "Port %d is already in stall state", port);
1448 	}
1449 
1450 	RDS_DPRINTF4("rds_stall_port", "Return: SP(%p) Port %d", sp, port);
1451 }
1452 
1453 void
1454 rds_resume_port(in_port_t port)
1455 {
1456 	rds_session_t	*sp;
1457 	uint_t		ix;
1458 	int		ret;
1459 
1460 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1461 
1462 	RDS_INCR_UNSTALLS_TRIGGERED();
1463 
1464 	/* resume loopback traffic */
1465 	(void) rds_check_n_unmark_port(NULL, port, RDS_LOOPBACK);
1466 
1467 	/* send unstall messages to resume the remote traffic */
1468 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1469 
1470 	sp = rdsib_statep->rds_sessionlistp;
1471 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1472 		ASSERT(sp != NULL);
1473 		if ((sp->session_state == RDS_SESSION_STATE_CONNECTED) &&
1474 		    (rds_check_n_unmark_port(sp, port, RDS_LOCAL))) {
1475 				ret = rds_post_control_message(sp,
1476 				    RDS_CTRL_CODE_UNSTALL, port);
1477 				if (ret != 0) {
1478 					(void) rds_check_n_mark_port(sp, port,
1479 					    RDS_LOCAL);
1480 				} else {
1481 					RDS_INCR_UNSTALLS_SENT();
1482 				}
1483 		}
1484 
1485 		sp = sp->session_nextp;
1486 	}
1487 
1488 	rw_exit(&rdsib_statep->rds_sessionlock);
1489 
1490 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1491 }
1492 
1493 static int
1494 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1495     in_port_t recvport)
1496 {
1497 	ibt_send_wr_t	*wrp, wr;
1498 	rds_buf_t	*bp, *bp1;
1499 	rds_data_hdr_t	*pktp;
1500 	uint32_t	msgsize, npkts, residual, pktno, ix;
1501 	int		ret;
1502 
1503 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1504 	    ep, uiop);
1505 
1506 	/* how many pkts are needed to carry this msg */
1507 	msgsize = uiop->uio_resid;
1508 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1509 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1510 
1511 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1512 	    msgsize, npkts);
1513 
1514 	/* Get the buffers needed to post this message */
1515 	bp = rds_get_send_buf(ep, npkts);
1516 	if (bp == NULL) {
1517 		RDS_INCR_ENOBUFS();
1518 		return (ENOBUFS);
1519 	}
1520 
1521 	if (npkts > 1) {
1522 		/*
1523 		 * multi-pkt messages are posted at the same time as a list
1524 		 * of WRs
1525 		 */
1526 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1527 		    npkts, KM_SLEEP);
1528 	}
1529 
1530 
1531 	pktno = 0;
1532 	bp1 = bp;
1533 	do {
1534 		/* prepare the header */
1535 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1536 		pktp->dh_datalen = UserBufferSize;
1537 		pktp->dh_npkts = npkts - pktno;
1538 		pktp->dh_psn = pktno;
1539 		pktp->dh_sendport = sendport;
1540 		pktp->dh_recvport = recvport;
1541 		bp1->buf_ds.ds_len = RdsPktSize;
1542 
1543 		/* copy the data */
1544 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1545 		    UserBufferSize, UIO_WRITE, uiop);
1546 		if (ret != 0) {
1547 			break;
1548 		}
1549 
1550 		if (uiop->uio_resid == 0) {
1551 			pktp->dh_datalen = residual;
1552 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1553 			break;
1554 		}
1555 		pktno++;
1556 		bp1 = bp1->buf_nextp;
1557 	} while (uiop->uio_resid);
1558 
1559 	if (ret) {
1560 		/* uiomove failed */
1561 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1562 		    uiop, ret);
1563 		if (npkts > 1) {
1564 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1565 		}
1566 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1567 		return (ret);
1568 	}
1569 
1570 	if (npkts > 1) {
1571 		/* multi-pkt message */
1572 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1573 
1574 		bp1 = bp;
1575 		for (ix = 0; ix < npkts; ix++) {
1576 			wrp[ix].wr_id = (uintptr_t)bp1;
1577 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1578 			wrp[ix].wr_trans = IBT_RC_SRV;
1579 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1580 			wrp[ix].wr_nds = 1;
1581 			wrp[ix].wr_sgl = &bp1->buf_ds;
1582 			bp1 = bp1->buf_nextp;
1583 		}
1584 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1585 
1586 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1587 		if (ret != IBT_SUCCESS) {
1588 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1589 			    "%d for %d pkts", ep, ret, npkts);
1590 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1591 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1592 			return (ret);
1593 		}
1594 
1595 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1596 	} else {
1597 		/* single pkt */
1598 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1599 		wr.wr_id = (uintptr_t)bp;
1600 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1601 		wr.wr_trans = IBT_RC_SRV;
1602 		wr.wr_opcode = IBT_WRC_SEND;
1603 		wr.wr_nds = 1;
1604 		wr.wr_sgl = &bp->buf_ds;
1605 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1606 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1607 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1608 		if (ret != IBT_SUCCESS) {
1609 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1610 			    "%d", ep, ret);
1611 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1612 			return (ret);
1613 		}
1614 	}
1615 
1616 	RDS_INCR_TXPKTS(npkts);
1617 	RDS_INCR_TXBYTES(msgsize);
1618 
1619 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1620 	    ep, uiop);
1621 
1622 	return (0);
1623 }
1624 
1625 static int
1626 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1627     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1628 {
1629 	mblk_t		*mp;
1630 	int		ret;
1631 
1632 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1633 
1634 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1635 	    "%d to recvport: %d", sendport, recvport);
1636 
1637 	mp = allocb(uiop->uio_resid, BPRI_MED);
1638 	if (mp == NULL) {
1639 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1640 		    uiop->uio_resid);
1641 		return (ENOSPC);
1642 	}
1643 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1644 
1645 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1646 	if (ret) {
1647 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1648 		freeb(mp);
1649 		return (ret);
1650 	}
1651 
1652 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1653 	    zoneid);
1654 	if (ret != 0) {
1655 		if (ret == ENOSPC) {
1656 			/*
1657 			 * The message is delivered but cannot take more,
1658 			 * stop further loopback traffic to this port
1659 			 */
1660 			RDS_DPRINTF3("rds_deliver_loopback_msg",
1661 			    "Port %d NO SPACE", recvport);
1662 			rds_stall_port(NULL, recvport, RDS_LOOPBACK);
1663 		} else {
1664 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1665 			    "port %d failed: %d", sendport, recvport, ret);
1666 			return (ret);
1667 		}
1668 	}
1669 
1670 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1671 	return (0);
1672 }
1673 
1674 static void
1675 rds_resend_messages(void *arg)
1676 {
1677 	rds_session_t	*sp = (rds_session_t *)arg;
1678 	rds_ep_t	*ep;
1679 	rds_bufpool_t	*spool;
1680 	rds_buf_t	*bp, *endp, *tmp;
1681 	ibt_send_wr_t	*wrp;
1682 	uint_t		nwr = 0, ix, jx;
1683 	int		ret;
1684 
1685 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1686 
1687 	ep = &sp->session_dataep;
1688 
1689 	spool = &ep->ep_sndpool;
1690 	mutex_enter(&spool->pool_lock);
1691 
1692 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1693 
1694 	if (ep->ep_lbufid == NULL) {
1695 		RDS_DPRINTF2("rds_resend_messages",
1696 		    "SP(%p) Remote session is cleaned up ", sp);
1697 		/*
1698 		 * The remote end cleaned up its session. There may be loss
1699 		 * of messages. Mark all buffers as acknowledged.
1700 		 */
1701 		tmp = spool->pool_tailp;
1702 	} else {
1703 		tmp = (rds_buf_t *)ep->ep_lbufid;
1704 		RDS_DPRINTF2("rds_resend_messages",
1705 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1706 	}
1707 
1708 	endp = spool->pool_tailp;
1709 	bp = spool->pool_headp;
1710 	jx = 0;
1711 	while ((bp != NULL) && (bp != tmp)) {
1712 		bp->buf_state = RDS_SNDBUF_FREE;
1713 		jx++;
1714 		bp = bp->buf_nextp;
1715 	}
1716 
1717 	if (bp == NULL) {
1718 		mutex_exit(&spool->pool_lock);
1719 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1720 		    "found in the list", tmp);
1721 
1722 		rw_enter(&sp->session_lock, RW_WRITER);
1723 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1724 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1725 		} else {
1726 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1727 			    "Expected State: %d", sp, sp->session_state,
1728 			    RDS_SESSION_STATE_CONNECTED);
1729 		}
1730 		sp->session_failover = 0;
1731 		rw_exit(&sp->session_lock);
1732 		return;
1733 	}
1734 
1735 	/* Found the match */
1736 	bp->buf_state = RDS_SNDBUF_FREE;
1737 	jx++;
1738 
1739 	spool->pool_tailp = bp;
1740 	bp = bp->buf_nextp;
1741 	spool->pool_tailp->buf_nextp = NULL;
1742 	nwr = spool->pool_nfree - jx;
1743 	spool->pool_nfree = jx;
1744 	mutex_exit(&spool->pool_lock);
1745 
1746 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1747 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1748 
1749 	if (bp) {
1750 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1751 		    KM_SLEEP);
1752 
1753 		while (nwr) {
1754 			jx = (nwr > 100) ? 100 : nwr;
1755 
1756 			tmp = bp;
1757 			for (ix = 0; ix < jx; ix++) {
1758 				bp->buf_state = RDS_SNDBUF_PENDING;
1759 				wrp[ix].wr_id = (uintptr_t)bp;
1760 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1761 				wrp[ix].wr_trans = IBT_RC_SRV;
1762 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1763 				wrp[ix].wr_nds = 1;
1764 				wrp[ix].wr_sgl = &bp->buf_ds;
1765 				bp = bp->buf_nextp;
1766 			}
1767 
1768 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1769 			if (ret != IBT_SUCCESS) {
1770 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1771 				    "failed: %d for % pkts", ep, ret, jx);
1772 				break;
1773 			}
1774 
1775 			mutex_enter(&spool->pool_lock);
1776 			spool->pool_nbusy += jx;
1777 			mutex_exit(&spool->pool_lock);
1778 
1779 			nwr -= jx;
1780 		}
1781 
1782 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1783 
1784 		if (nwr != 0) {
1785 
1786 			/*
1787 			 * An error while failover is in progress. Some WRs are
1788 			 * posted while other remain. If any of the posted WRs
1789 			 * complete in error then they would dispatch a taskq to
1790 			 * do a failover. Getting the session lock will prevent
1791 			 * the taskq to wait until we are done here.
1792 			 */
1793 			rw_enter(&sp->session_lock, RW_READER);
1794 
1795 			/*
1796 			 * Wait until all the previous WRs are completed and
1797 			 * then queue the remaining, otherwise the order of
1798 			 * the messages may change.
1799 			 */
1800 			(void) rds_is_sendq_empty(ep, 1);
1801 
1802 			/* free the remaining buffers */
1803 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1804 
1805 			rw_exit(&sp->session_lock);
1806 			return;
1807 		}
1808 	}
1809 
1810 	rw_enter(&sp->session_lock, RW_WRITER);
1811 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1812 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1813 	} else {
1814 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1815 		    "Expected State: %d", sp, sp->session_state,
1816 		    RDS_SESSION_STATE_CONNECTED);
1817 	}
1818 	sp->session_failover = 0;
1819 	rw_exit(&sp->session_lock);
1820 
1821 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1822 }
1823 
1824 /*
1825  * This is called when a channel is connected. Transition the session to
1826  * CONNECTED state iff both channels are connected.
1827  */
1828 void
1829 rds_session_active(rds_session_t *sp)
1830 {
1831 	rds_ep_t	*ep;
1832 	uint_t		failover;
1833 
1834 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1835 
1836 	rw_enter(&sp->session_lock, RW_READER);
1837 
1838 	failover = sp->session_failover;
1839 
1840 	/*
1841 	 * we establish the data channel first, so check the control channel
1842 	 * first but make sure it is initialized.
1843 	 */
1844 	ep = &sp->session_ctrlep;
1845 	mutex_enter(&ep->ep_lock);
1846 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1847 		/* the session is not ready yet */
1848 		mutex_exit(&ep->ep_lock);
1849 		rw_exit(&sp->session_lock);
1850 		return;
1851 	}
1852 	mutex_exit(&ep->ep_lock);
1853 
1854 	/* control channel is connected, check the data channel */
1855 	ep = &sp->session_dataep;
1856 	mutex_enter(&ep->ep_lock);
1857 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1858 		/* data channel is not yet connected */
1859 		mutex_exit(&ep->ep_lock);
1860 		rw_exit(&sp->session_lock);
1861 		return;
1862 	}
1863 	mutex_exit(&ep->ep_lock);
1864 
1865 	if (failover) {
1866 		rw_exit(&sp->session_lock);
1867 
1868 		/*
1869 		 * The session has failed over. Previous msgs have to be
1870 		 * re-sent before the session is moved to the connected
1871 		 * state.
1872 		 */
1873 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1874 		    "to re-send messages", sp);
1875 		(void) ddi_taskq_dispatch(rds_taskq,
1876 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1877 		return;
1878 	}
1879 
1880 	/* the session is ready */
1881 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1882 	RDS_DPRINTF3("rds_session_active",
1883 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1884 
1885 	rw_exit(&sp->session_lock);
1886 
1887 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1888 }
1889 
1890 static int
1891 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1892     in_port_t recvport)
1893 {
1894 	int	ret;
1895 
1896 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1897 	    "%d", ep, sendport, recvport);
1898 
1899 	/* make sure the remote port is not stalled */
1900 	if (rds_is_port_marked(ep->ep_sp, recvport, RDS_REMOTE)) {
1901 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1902 		    ep->ep_sp, recvport);
1903 		RDS_INCR_EWOULDBLOCK();
1904 		ret = ENOMEM;
1905 	} else {
1906 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1907 	}
1908 
1909 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1910 
1911 	return (ret);
1912 }
1913 
1914 /* Send a message to a destination socket */
1915 int
1916 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1917     in_port_t recvport, zoneid_t zoneid)
1918 {
1919 	rds_session_t	*sp;
1920 	ib_gid_t	lgid, rgid;
1921 	int		ret;
1922 
1923 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1924 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1925 	    sendport, recvport);
1926 
1927 	/* If msg length is 0, just return success */
1928 	if (uiop->uio_resid == 0) {
1929 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
1930 		return (0);
1931 	}
1932 
1933 	/* Is there a session to the destination? */
1934 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1935 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
1936 	rw_exit(&rdsib_statep->rds_sessionlock);
1937 
1938 	/* Is this a loopback message? */
1939 	if ((sp == NULL) && (rds_islocal(recvip))) {
1940 		/* make sure the port is not stalled */
1941 		if (rds_is_port_marked(NULL, recvport, RDS_LOOPBACK)) {
1942 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
1943 			    recvport);
1944 			RDS_INCR_EWOULDBLOCK();
1945 			return (ENOMEM);
1946 		}
1947 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
1948 		    sendport, zoneid);
1949 		return (ret);
1950 	}
1951 
1952 	/* Not a loopback message */
1953 	if (sp == NULL) {
1954 		/* There is no session to the destination, create one. */
1955 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
1956 		    "IP: 0x%x", recvip);
1957 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
1958 		    RDS_SESSION_ACTIVE);
1959 		if (sp != NULL) {
1960 			rw_enter(&sp->session_lock, RW_WRITER);
1961 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1962 				ret = rds_session_init(sp);
1963 				if (ret != 0) {
1964 					RDS_DPRINTF2("rds_sendmsg",
1965 					    "SP(%p): rds_session_init failed",
1966 					    sp);
1967 					sp->session_state =
1968 					    RDS_SESSION_STATE_FAILED;
1969 					RDS_DPRINTF3("rds_sendmsg",
1970 					    "SP(%p) State "
1971 					    "RDS_SESSION_STATE_FAILED", sp);
1972 					rw_exit(&sp->session_lock);
1973 					return (EFAULT);
1974 				}
1975 				sp->session_state = RDS_SESSION_STATE_INIT;
1976 				RDS_DPRINTF3("rds_sendmsg",
1977 				    "SP(%p) State "
1978 				    "RDS_SESSION_STATE_INIT", sp);
1979 				rw_exit(&sp->session_lock);
1980 				rds_session_open(sp);
1981 			} else {
1982 				rw_exit(&sp->session_lock);
1983 			}
1984 		} else {
1985 			/* Is a session created for this destination */
1986 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1987 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
1988 			rw_exit(&rdsib_statep->rds_sessionlock);
1989 			if (sp == NULL) {
1990 				return (EFAULT);
1991 			}
1992 		}
1993 	}
1994 
1995 	/* There is a session to the destination */
1996 	rw_enter(&sp->session_lock, RW_READER);
1997 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
1998 		rw_exit(&sp->session_lock);
1999 
2000 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2001 		    recvport);
2002 		return (ret);
2003 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2004 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2005 		ipaddr_t sendip1, recvip1;
2006 
2007 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
2008 		    "%d", sp, sp->session_state);
2009 		rw_exit(&sp->session_lock);
2010 		rw_enter(&sp->session_lock, RW_WRITER);
2011 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
2012 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
2013 			ibt_ip_path_attr_t	ipattr;
2014 			ibt_ip_addr_t		dstip;
2015 
2016 			sp->session_state = RDS_SESSION_STATE_CREATED;
2017 			sp->session_type = RDS_SESSION_ACTIVE;
2018 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
2019 			    "RDS_SESSION_STATE_CREATED", sp);
2020 			rw_exit(&sp->session_lock);
2021 
2022 
2023 			/* The ipaddr should be in the network order */
2024 			sendip1 = sendip;
2025 			recvip1 = recvip;
2026 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
2027 			if (ret == 0) {
2028 				RDS_DPRINTF2(LABEL, "Path not found "
2029 				    "(0x%x 0x%x)", sendip1, recvip1);
2030 			}
2031 
2032 			/* Resolve the IP addresses */
2033 			lgid.gid_prefix = 0;
2034 			lgid.gid_guid = 0;
2035 			rgid.gid_prefix = 0;
2036 			rgid.gid_guid = 0;
2037 
2038 			bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
2039 			dstip.family = AF_INET;
2040 			dstip.un.ip4addr = htonl(recvip1);
2041 			ipattr.ipa_dst_ip = &dstip;
2042 			ipattr.ipa_src_ip.family = AF_INET;
2043 			ipattr.ipa_src_ip.un.ip4addr = htonl(sendip1);
2044 			ipattr.ipa_ndst = 1;
2045 			ipattr.ipa_max_paths = 1;
2046 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
2047 			    sendip1, recvip1);
2048 			ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
2049 			    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo,
2050 			    NULL, NULL);
2051 			if (ret != IBT_SUCCESS) {
2052 				RDS_DPRINTF1("rds_sendmsg",
2053 				    "ibt_get_ip_paths failed, ret: %d ", ret);
2054 
2055 				rw_enter(&sp->session_lock, RW_WRITER);
2056 				if (sp->session_type == RDS_SESSION_ACTIVE) {
2057 					sp->session_state =
2058 					    RDS_SESSION_STATE_FAILED;
2059 					RDS_DPRINTF3("rds_sendmsg",
2060 					    "SP(%p) State "
2061 					    "RDS_SESSION_STATE_FAILED", sp);
2062 					rw_exit(&sp->session_lock);
2063 					return (EFAULT);
2064 				} else {
2065 					rw_exit(&sp->session_lock);
2066 					return (ENOMEM);
2067 				}
2068 			}
2069 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
2070 			lgid = sp->session_pinfo.
2071 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
2072 			rgid = sp->session_pinfo.
2073 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
2074 
2075 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
2076 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
2077 			    rgid.gid_guid);
2078 
2079 			rw_enter(&sp->session_lock, RW_WRITER);
2080 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2081 				sp->session_lgid = lgid;
2082 				sp->session_rgid = rgid;
2083 				ret = rds_session_init(sp);
2084 				if (ret != 0) {
2085 					RDS_DPRINTF2("rds_sendmsg",
2086 					    "SP(%p): rds_session_init failed",
2087 					    sp);
2088 					sp->session_state =
2089 					    RDS_SESSION_STATE_FAILED;
2090 					RDS_DPRINTF3("rds_sendmsg",
2091 					    "SP(%p) State "
2092 					    "RDS_SESSION_STATE_FAILED", sp);
2093 					rw_exit(&sp->session_lock);
2094 					return (EFAULT);
2095 				}
2096 				sp->session_state = RDS_SESSION_STATE_INIT;
2097 				rw_exit(&sp->session_lock);
2098 
2099 				rds_session_open(sp);
2100 
2101 			} else {
2102 				RDS_DPRINTF2("rds_sendmsg",
2103 				    "SP(%p): type changed to %d",
2104 				    sp, sp->session_type);
2105 				rw_exit(&sp->session_lock);
2106 				return (ENOMEM);
2107 			}
2108 		} else {
2109 			RDS_DPRINTF2("rds_sendmsg",
2110 			    "SP(%p): Session state %d changed",
2111 			    sp, sp->session_state);
2112 			rw_exit(&sp->session_lock);
2113 			return (ENOMEM);
2114 		}
2115 	} else {
2116 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): Session is in %d state",
2117 		    sp, sp->session_state);
2118 		rw_exit(&sp->session_lock);
2119 		return (ENOMEM);
2120 	}
2121 
2122 	rw_enter(&sp->session_lock, RW_READER);
2123 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2124 		rw_exit(&sp->session_lock);
2125 
2126 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2127 		    recvport);
2128 	} else {
2129 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): state(%d) not connected",
2130 		    sp, sp->session_state);
2131 		rw_exit(&sp->session_lock);
2132 	}
2133 
2134 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
2135 
2136 	return (ret);
2137 }
2138 
2139 /* Note: This is called on the CQ handler thread */
2140 void
2141 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
2142 {
2143 	mblk_t		*mp, *mp1;
2144 	rds_data_hdr_t	*pktp, *pktp1;
2145 	uint8_t		*datap;
2146 	rds_buf_t	*bp1;
2147 	rds_bufpool_t	*rpool;
2148 	uint_t		npkts, ix;
2149 	int		ret;
2150 
2151 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
2152 
2153 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
2154 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
2155 	npkts = pktp->dh_npkts;
2156 
2157 	/* increment rx pending here */
2158 	rpool = &ep->ep_rcvpool;
2159 	mutex_enter(&rpool->pool_lock);
2160 	rpool->pool_nbusy += npkts;
2161 	mutex_exit(&rpool->pool_lock);
2162 
2163 	/* this will get freed by sockfs */
2164 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
2165 	if (mp == NULL) {
2166 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2167 		    ep, bp);
2168 		rds_free_recv_buf(bp, npkts);
2169 		return;
2170 	}
2171 	mp->b_wptr = datap + pktp->dh_datalen;
2172 	mp->b_datap->db_type = M_DATA;
2173 
2174 	mp1 = mp;
2175 	bp1 = bp->buf_nextp;
2176 	while (bp1 != NULL) {
2177 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
2178 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
2179 		    RDS_DATA_HDR_SZ;
2180 
2181 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
2182 		    BPRI_HI, &bp1->buf_frtn);
2183 		if (mp1->b_cont == NULL) {
2184 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2185 			    ep, bp1);
2186 			freemsg(mp);
2187 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
2188 			return;
2189 		}
2190 		mp1 = mp1->b_cont;
2191 		mp1->b_wptr = datap + pktp1->dh_datalen;
2192 		mp1->b_datap->db_type = M_DATA;
2193 
2194 		bp1 = bp1->buf_nextp;
2195 	}
2196 
2197 	RDS_INCR_RXPKTS_PEND(npkts);
2198 	RDS_INCR_RXPKTS(npkts);
2199 	RDS_INCR_RXBYTES(msgdsize(mp));
2200 
2201 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
2202 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
2203 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
2204 	    npkts, pktp->dh_psn);
2205 
2206 	/* store the last buffer id, no lock needed */
2207 	if (npkts > 1) {
2208 		ep->ep_rbufid = pktp1->dh_bufid;
2209 	} else {
2210 		ep->ep_rbufid = pktp->dh_bufid;
2211 	}
2212 
2213 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
2214 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
2215 	if (ret != 0) {
2216 		if (ret == ENOSPC) {
2217 			/*
2218 			 * The message is delivered but cannot take more,
2219 			 * stop further remote messages coming to this port
2220 			 */
2221 			RDS_DPRINTF3("rds_received_msg", "Port %d NO SPACE",
2222 			    pktp->dh_recvport);
2223 			rds_stall_port(ep->ep_sp, pktp->dh_recvport, RDS_LOCAL);
2224 		} else {
2225 			RDS_DPRINTF1(LABEL, "rds_deliver_new_msg returned: %d",
2226 			    ret);
2227 		}
2228 	}
2229 
2230 	mutex_enter(&ep->ep_lock);
2231 	/* The first message can come in before the conn est event */
2232 	if ((ep->ep_rdmacnt == 0) && (ep->ep_state == RDS_EP_STATE_CONNECTED)) {
2233 		ep->ep_rdmacnt++;
2234 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
2235 		mutex_exit(&ep->ep_lock);
2236 
2237 		/* send acknowledgement */
2238 		RDS_INCR_TXACKS();
2239 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
2240 		if (ret != IBT_SUCCESS) {
2241 			RDS_DPRINTF1(LABEL, "EP(%p): ibt_post_send for "
2242 			    "acknowledgement failed: %d, SQ depth: %d",
2243 			    ep, ret, ep->ep_sndpool.pool_nbusy);
2244 			mutex_enter(&ep->ep_lock);
2245 			ep->ep_rdmacnt--;
2246 			mutex_exit(&ep->ep_lock);
2247 		}
2248 	} else {
2249 		/* no room to send acknowledgement */
2250 		mutex_exit(&ep->ep_lock);
2251 	}
2252 
2253 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
2254 }
2255