xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision bea83d026ee1bd1b2a2419e1d0232f107a5d7d9b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #pragma ident	"%Z%%M%	%I%	%E% SMI"
76 
77 #include <sys/stream.h>
78 #include <sys/ib/clients/rds/rdsib_cm.h>
79 #include <sys/ib/clients/rds/rdsib_ib.h>
80 #include <sys/ib/clients/rds/rdsib_buf.h>
81 #include <sys/ib/clients/rds/rdsib_ep.h>
82 #include <sys/ib/clients/rds/rds_kstat.h>
83 #include <sys/zone.h>
84 
85 #define	RDS_POLL_CQ_IN_2TICKS	1
86 
87 /*
88  * This File contains the endpoint related calls
89  */
90 
91 extern boolean_t rds_islocal(ipaddr_t addr);
92 extern uint_t rds_wc_signal;
93 
94 #define	RDS_LOOPBACK	0
95 #define	RDS_LOCAL	1
96 #define	RDS_REMOTE	2
97 
98 #define	IBT_IPADDR	1
99 
100 static uint8_t
101 rds_is_port_marked(rds_session_t *sp, in_port_t port, uint_t qualifier)
102 {
103 	uint8_t	ret;
104 
105 	switch (qualifier) {
106 	case RDS_LOOPBACK: /* loopback */
107 		rw_enter(&rds_loopback_portmap_lock, RW_READER);
108 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
109 		rw_exit(&rds_loopback_portmap_lock);
110 		break;
111 
112 	case RDS_LOCAL: /* Session local */
113 		ASSERT(sp != NULL);
114 		rw_enter(&sp->session_local_portmap_lock, RW_READER);
115 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
116 		rw_exit(&sp->session_local_portmap_lock);
117 		break;
118 
119 	case RDS_REMOTE: /* Session remote */
120 		ASSERT(sp != NULL);
121 		rw_enter(&sp->session_remote_portmap_lock, RW_READER);
122 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
123 		rw_exit(&sp->session_remote_portmap_lock);
124 		break;
125 	}
126 
127 	return (ret);
128 }
129 
130 static uint8_t
131 rds_check_n_mark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
132 {
133 	uint8_t	ret;
134 
135 	switch (qualifier) {
136 	case RDS_LOOPBACK: /* loopback */
137 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
138 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
139 		if (!ret) {
140 			/* port is not marked, mark it */
141 			rds_loopback_portmap[port/8] =
142 			    rds_loopback_portmap[port/8] | (1 << (port % 8));
143 		}
144 		rw_exit(&rds_loopback_portmap_lock);
145 		break;
146 
147 	case RDS_LOCAL: /* Session local */
148 		ASSERT(sp != NULL);
149 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
150 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
151 		if (!ret) {
152 			/* port is not marked, mark it */
153 			sp->session_local_portmap[port/8] =
154 			    sp->session_local_portmap[port/8] |
155 			    (1 << (port % 8));
156 		}
157 		rw_exit(&sp->session_local_portmap_lock);
158 		break;
159 
160 	case RDS_REMOTE: /* Session remote */
161 		ASSERT(sp != NULL);
162 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
163 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
164 		if (!ret) {
165 			/* port is not marked, mark it */
166 			sp->session_remote_portmap[port/8] =
167 			    sp->session_remote_portmap[port/8] |
168 			    (1 << (port % 8));
169 		}
170 		rw_exit(&sp->session_remote_portmap_lock);
171 		break;
172 	}
173 
174 	return (ret);
175 }
176 
177 static uint8_t
178 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
179 {
180 	uint8_t	ret;
181 
182 	switch (qualifier) {
183 	case RDS_LOOPBACK: /* loopback */
184 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
185 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
186 		if (ret) {
187 			/* port is marked, unmark it */
188 			rds_loopback_portmap[port/8] =
189 			    rds_loopback_portmap[port/8] & ~(1 << (port % 8));
190 		}
191 		rw_exit(&rds_loopback_portmap_lock);
192 		break;
193 
194 	case RDS_LOCAL: /* Session local */
195 		ASSERT(sp != NULL);
196 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
197 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
198 		if (ret) {
199 			/* port is marked, unmark it */
200 			sp->session_local_portmap[port/8] =
201 			    sp->session_local_portmap[port/8] &
202 			    ~(1 << (port % 8));
203 		}
204 		rw_exit(&sp->session_local_portmap_lock);
205 		break;
206 
207 	case RDS_REMOTE: /* Session remote */
208 		ASSERT(sp != NULL);
209 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
210 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
211 		if (ret) {
212 			/* port is marked, unmark it */
213 			sp->session_remote_portmap[port/8] =
214 			    sp->session_remote_portmap[port/8] &
215 			    ~(1 << (port % 8));
216 		}
217 		rw_exit(&sp->session_remote_portmap_lock);
218 		break;
219 	}
220 
221 	return (ret);
222 }
223 
224 static void
225 rds_mark_all_ports(rds_session_t *sp, uint_t qualifier)
226 {
227 	switch (qualifier) {
228 	case RDS_LOOPBACK: /* loopback */
229 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
230 		(void) memset(rds_loopback_portmap, 0xFF, RDS_PORT_MAP_SIZE);
231 		rw_exit(&rds_loopback_portmap_lock);
232 		break;
233 
234 	case RDS_LOCAL: /* Session local */
235 		ASSERT(sp != NULL);
236 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
237 		(void) memset(sp->session_local_portmap, 0xFF,
238 		    RDS_PORT_MAP_SIZE);
239 		rw_exit(&sp->session_local_portmap_lock);
240 		break;
241 
242 	case RDS_REMOTE: /* Session remote */
243 		ASSERT(sp != NULL);
244 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
245 		(void) memset(sp->session_remote_portmap, 0xFF,
246 		    RDS_PORT_MAP_SIZE);
247 		rw_exit(&sp->session_remote_portmap_lock);
248 		break;
249 	}
250 }
251 
252 static void
253 rds_unmark_all_ports(rds_session_t *sp, uint_t qualifier)
254 {
255 	switch (qualifier) {
256 	case RDS_LOOPBACK: /* loopback */
257 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
258 		bzero(rds_loopback_portmap, RDS_PORT_MAP_SIZE);
259 		rw_exit(&rds_loopback_portmap_lock);
260 		break;
261 
262 	case RDS_LOCAL: /* Session local */
263 		ASSERT(sp != NULL);
264 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
265 		bzero(sp->session_local_portmap, RDS_PORT_MAP_SIZE);
266 		rw_exit(&sp->session_local_portmap_lock);
267 		break;
268 
269 	case RDS_REMOTE: /* Session remote */
270 		ASSERT(sp != NULL);
271 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
272 		bzero(sp->session_remote_portmap, RDS_PORT_MAP_SIZE);
273 		rw_exit(&sp->session_remote_portmap_lock);
274 		break;
275 	}
276 }
277 
278 static void
279 rds_add_session(rds_session_t *sp, boolean_t locked)
280 {
281 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
282 
283 	if (!locked) {
284 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
285 	}
286 
287 	sp->session_nextp = rdsib_statep->rds_sessionlistp;
288 	rdsib_statep->rds_sessionlistp = sp;
289 	rdsib_statep->rds_nsessions++;
290 
291 	if (!locked) {
292 		rw_exit(&rdsib_statep->rds_sessionlock);
293 	}
294 	RDS_INCR_SESS();
295 
296 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
297 }
298 
299 /* Session lookup based on destination IP or destination node guid */
300 rds_session_t *
301 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
302 {
303 	rds_session_t	*sp;
304 
305 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
306 	    remoteip, node_guid);
307 
308 	/* A read/write lock is expected, will panic if none of them are held */
309 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
310 	sp = statep->rds_sessionlistp;
311 	while (sp) {
312 		if ((sp->session_remip == remoteip) || ((node_guid != 0) &&
313 		    (sp->session_rgid.gid_guid == node_guid))) {
314 			break;
315 		}
316 
317 		sp = sp->session_nextp;
318 	}
319 
320 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
321 
322 	return (sp);
323 }
324 
325 boolean_t
326 rds_session_lkup_by_sp(rds_session_t *sp)
327 {
328 	rds_session_t *sessionp;
329 
330 	RDS_DPRINTF4("rds_session_lkup_by_sp", "Enter: 0x%p", sp);
331 
332 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
333 	sessionp = rdsib_statep->rds_sessionlistp;
334 	while (sessionp) {
335 		if (sessionp == sp) {
336 			rw_exit(&rdsib_statep->rds_sessionlock);
337 			return (B_TRUE);
338 		}
339 
340 		sessionp = sessionp->session_nextp;
341 	}
342 	rw_exit(&rdsib_statep->rds_sessionlock);
343 
344 	return (B_FALSE);
345 }
346 
347 static void
348 rds_ep_fini(rds_ep_t *ep)
349 {
350 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
351 
352 	/* free send pool */
353 	rds_free_send_pool(ep);
354 
355 	/* free recv pool */
356 	rds_free_recv_pool(ep);
357 
358 	mutex_enter(&ep->ep_lock);
359 	ep->ep_hca_guid = 0;
360 	mutex_exit(&ep->ep_lock);
361 
362 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
363 }
364 
365 /* Assumes SP write lock is held */
366 int
367 rds_ep_init(rds_ep_t *ep, ib_guid_t hca_guid)
368 {
369 	uint_t		ret;
370 
371 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
372 
373 	/* send pool */
374 	ret = rds_init_send_pool(ep, hca_guid);
375 	if (ret != 0) {
376 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
377 		    ep, ret);
378 		return (-1);
379 	}
380 
381 	/* recv pool */
382 	ret = rds_init_recv_pool(ep);
383 	if (ret != 0) {
384 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
385 		    ep, ret);
386 		rds_free_send_pool(ep);
387 		return (-1);
388 	}
389 
390 	/* reset the ep state */
391 	mutex_enter(&ep->ep_lock);
392 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
393 	ep->ep_hca_guid = hca_guid;
394 	ep->ep_lbufid = NULL;
395 	ep->ep_rbufid = NULL;
396 	ep->ep_segfbp = NULL;
397 	ep->ep_seglbp = NULL;
398 
399 	/* Initialize the WR to send acknowledgements */
400 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
401 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
402 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
403 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
404 	ep->ep_ackwr.wr_nds = 1;
405 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
406 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
407 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
408 	mutex_exit(&ep->ep_lock);
409 
410 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
411 
412 	return (0);
413 }
414 
415 static int
416 rds_ep_reinit(rds_ep_t *ep, ib_guid_t hca_guid)
417 {
418 	int	ret;
419 
420 	RDS_DPRINTF3("rds_ep_reinit", "Enter: EP(%p) Type: %d",
421 	    ep, ep->ep_type);
422 
423 	/* Re-initialize send pool */
424 	ret = rds_reinit_send_pool(ep, hca_guid);
425 	if (ret != 0) {
426 		RDS_DPRINTF2("rds_ep_reinit",
427 		    "EP(%p): rds_reinit_send_pool failed: %d", ep, ret);
428 		return (-1);
429 	}
430 
431 	/* free all the receive buffers in the pool */
432 	rds_free_recv_pool(ep);
433 
434 	RDS_DPRINTF3("rds_ep_reinit", "Return: EP(%p) Type: %d",
435 	    ep, ep->ep_type);
436 
437 	return (0);
438 }
439 
440 void
441 rds_session_fini(rds_session_t *sp)
442 {
443 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
444 
445 	rds_ep_fini(&sp->session_dataep);
446 	rds_ep_fini(&sp->session_ctrlep);
447 
448 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
449 }
450 
451 /*
452  * Allocate and initialize the resources needed for the control and
453  * data channels
454  */
455 int
456 rds_session_init(rds_session_t *sp)
457 {
458 	int		ret;
459 	rds_hca_t	*hcap;
460 	ib_guid_t	hca_guid;
461 
462 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
463 
464 	/* CALLED WITH SESSION WRITE LOCK */
465 
466 	hcap = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
467 	if (hcap == NULL) {
468 		RDS_DPRINTF1("rds_session_init", "SGID is on an uninitialized "
469 		    "HCA: %llx", sp->session_lgid.gid_guid);
470 		return (-1);
471 	}
472 
473 	hca_guid = hcap->hca_guid;
474 
475 	/* allocate and initialize the ctrl channel */
476 	ret = rds_ep_init(&sp->session_ctrlep, hca_guid);
477 	if (ret != 0) {
478 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
479 		    "failed", sp, &sp->session_ctrlep);
480 		return (-1);
481 	}
482 
483 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
484 
485 	/* allocate and initialize the data channel */
486 	ret = rds_ep_init(&sp->session_dataep, hca_guid);
487 	if (ret != 0) {
488 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
489 		    "failed", sp, &sp->session_dataep);
490 		rds_ep_fini(&sp->session_ctrlep);
491 		return (-1);
492 	}
493 
494 	/* Clear the portmaps */
495 	rds_unmark_all_ports(sp, RDS_LOCAL);
496 	rds_unmark_all_ports(sp, RDS_REMOTE);
497 
498 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
499 
500 	RDS_DPRINTF2("rds_session_init", "Return");
501 
502 	return (0);
503 }
504 
505 /*
506  * This should be called before moving a session from ERROR state to
507  * INIT state. This will update the HCA keys incase the session has moved from
508  * one HCA to another.
509  */
510 int
511 rds_session_reinit(rds_session_t *sp, ib_gid_t lgid)
512 {
513 	rds_hca_t	*hcap, *hcap1;
514 	int		ret;
515 
516 	RDS_DPRINTF2("rds_session_reinit", "Enter: SP(0x%p)", sp);
517 
518 	/* CALLED WITH SESSION WRITE LOCK */
519 
520 	hcap = rds_gid_to_hcap(rdsib_statep, lgid);
521 	if (hcap == NULL) {
522 		RDS_DPRINTF1("rds_session_reinit", "SGID is on an "
523 		    "uninitialized HCA: %llx", lgid.gid_guid);
524 		return (-1);
525 	}
526 
527 	hcap1 = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
528 	if (hcap1 == NULL) {
529 		RDS_DPRINTF1("rds_session_reinit", "Seems like HCA %llx "
530 		    "is unplugged", sp->session_lgid.gid_guid);
531 	} else if (hcap->hca_guid == hcap1->hca_guid) {
532 		/*
533 		 * No action is needed as the session did not move across
534 		 * HCAs
535 		 */
536 		RDS_DPRINTF2("rds_session_reinit", "Failover on the same HCA");
537 		return (0);
538 	}
539 
540 	RDS_DPRINTF2("rds_session_reinit", "Failover across HCAs");
541 
542 	/* re-initialize the control channel */
543 	ret = rds_ep_reinit(&sp->session_ctrlep, hcap->hca_guid);
544 	if (ret != 0) {
545 		RDS_DPRINTF2("rds_session_reinit",
546 		    "SP(%p): Ctrl EP(%p) re-initialization failed",
547 		    sp, &sp->session_ctrlep);
548 		return (-1);
549 	}
550 
551 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Control EP(%p)",
552 	    sp, &sp->session_ctrlep);
553 
554 	/* re-initialize the data channel */
555 	ret = rds_ep_reinit(&sp->session_dataep, hcap->hca_guid);
556 	if (ret != 0) {
557 		RDS_DPRINTF2("rds_session_reinit",
558 		    "SP(%p): Data EP(%p) re-initialization failed",
559 		    sp, &sp->session_dataep);
560 		return (-1);
561 	}
562 
563 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Data EP(%p)",
564 	    sp, &sp->session_dataep);
565 
566 	sp->session_lgid = lgid;
567 
568 	/* Clear the portmaps */
569 	rds_unmark_all_ports(sp, RDS_LOCAL);
570 	rds_unmark_all_ports(sp, RDS_REMOTE);
571 
572 	RDS_DPRINTF2("rds_session_reinit", "Return: SP(0x%p)", sp);
573 
574 	return (0);
575 }
576 
577 static int
578 rds_session_connect(rds_session_t *sp)
579 {
580 	ibt_channel_hdl_t	ctrlchan, datachan;
581 	rds_ep_t		*ep;
582 	int			ret;
583 
584 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
585 
586 	sp->session_pinfo.pi_sid = rdsib_statep->rds_service_id;
587 
588 	/* Override the packet life time based on the conf file */
589 	if (IBPktLifeTime != 0) {
590 		sp->session_pinfo.pi_prim_cep_path.cep_cm_opaque1 =
591 		    IBPktLifeTime;
592 	}
593 
594 	/* Session type may change if we run into peer-to-peer case. */
595 	rw_enter(&sp->session_lock, RW_READER);
596 	if (sp->session_type == RDS_SESSION_PASSIVE) {
597 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
598 		    "active end", sp);
599 		rw_exit(&sp->session_lock);
600 		return (0); /* return success */
601 	}
602 	rw_exit(&sp->session_lock);
603 
604 	/* connect the data ep first */
605 	ep = &sp->session_dataep;
606 	mutex_enter(&ep->ep_lock);
607 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
608 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
609 		mutex_exit(&ep->ep_lock);
610 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
611 		    &datachan);
612 		if (ret != IBT_SUCCESS) {
613 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
614 			    "failed: %d", ep, ret);
615 			return (-1);
616 		}
617 		sp->session_dataep.ep_chanhdl = datachan;
618 	} else {
619 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
620 		    "unexpected state: %d", sp, ep, ep->ep_state);
621 		mutex_exit(&ep->ep_lock);
622 		return (-1);
623 	}
624 
625 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
626 	    sp, ep);
627 
628 	ep = &sp->session_ctrlep;
629 	mutex_enter(&ep->ep_lock);
630 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
631 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
632 		mutex_exit(&ep->ep_lock);
633 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
634 		    &ctrlchan);
635 		if (ret != IBT_SUCCESS) {
636 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
637 			    "failed: %d", ep, ret);
638 			return (-1);
639 		}
640 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
641 	} else {
642 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
643 		    "unexpected state: %d", sp, ep, ep->ep_state);
644 		mutex_exit(&ep->ep_lock);
645 		return (-1);
646 	}
647 
648 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
649 	    sp, sp->session_myip, sp->session_remip);
650 
651 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
652 
653 	return (0);
654 }
655 
656 /*
657  * Can be called with or without session_lock.
658  */
659 void
660 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
661 {
662 	rds_ep_t		*ep;
663 
664 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
665 	    sp->session_state);
666 
667 	ep = &sp->session_dataep;
668 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
669 
670 	/* wait until the SQ is empty before closing */
671 	(void) rds_is_sendq_empty(ep, wait);
672 
673 	mutex_enter(&ep->ep_lock);
674 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
675 		mutex_exit(&ep->ep_lock);
676 		delay(drv_usectohz(300000));
677 		mutex_enter(&ep->ep_lock);
678 	}
679 
680 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
681 		ep->ep_state = RDS_EP_STATE_CLOSING;
682 		mutex_exit(&ep->ep_lock);
683 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
684 		mutex_enter(&ep->ep_lock);
685 	}
686 	rds_ep_free_rc_channel(ep);
687 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
688 	ep->ep_segfbp = NULL;
689 	ep->ep_seglbp = NULL;
690 	mutex_exit(&ep->ep_lock);
691 
692 	ep = &sp->session_ctrlep;
693 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
694 
695 	/* wait until the SQ is empty before closing */
696 	(void) rds_is_sendq_empty(ep, 1);
697 
698 	mutex_enter(&ep->ep_lock);
699 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
700 		mutex_exit(&ep->ep_lock);
701 		delay(drv_usectohz(300000));
702 		mutex_enter(&ep->ep_lock);
703 	}
704 
705 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
706 		ep->ep_state = RDS_EP_STATE_CLOSING;
707 		mutex_exit(&ep->ep_lock);
708 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
709 		mutex_enter(&ep->ep_lock);
710 	}
711 	rds_ep_free_rc_channel(ep);
712 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
713 	ep->ep_segfbp = NULL;
714 	ep->ep_seglbp = NULL;
715 	mutex_exit(&ep->ep_lock);
716 
717 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
718 }
719 
720 /* Free the session */
721 static void
722 rds_destroy_session(rds_session_t *sp)
723 {
724 	rds_ep_t	*ep;
725 	rds_bufpool_t	*pool;
726 
727 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
728 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
729 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
730 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
731 
732 	rw_enter(&sp->session_lock, RW_READER);
733 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
734 	    sp->session_state);
735 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
736 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
737 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
738 		rw_exit(&sp->session_lock);
739 		delay(drv_usectohz(1000000));
740 		rw_enter(&sp->session_lock, RW_READER);
741 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
742 		    "ON SESSION", sp, sp->session_state);
743 	}
744 	rw_exit(&sp->session_lock);
745 
746 	/* data channel */
747 	ep = &sp->session_dataep;
748 
749 	/* send pool locks */
750 	pool = &ep->ep_sndpool;
751 	cv_destroy(&pool->pool_cv);
752 	mutex_destroy(&pool->pool_lock);
753 
754 	/* recv pool locks */
755 	pool = &ep->ep_rcvpool;
756 	cv_destroy(&pool->pool_cv);
757 	mutex_destroy(&pool->pool_lock);
758 	mutex_destroy(&ep->ep_recvqp.qp_lock);
759 
760 	/* control channel */
761 	ep = &sp->session_ctrlep;
762 
763 	/* send pool locks */
764 	pool = &ep->ep_sndpool;
765 	cv_destroy(&pool->pool_cv);
766 	mutex_destroy(&pool->pool_lock);
767 
768 	/* recv pool locks */
769 	pool = &ep->ep_rcvpool;
770 	cv_destroy(&pool->pool_cv);
771 	mutex_destroy(&pool->pool_lock);
772 	mutex_destroy(&ep->ep_recvqp.qp_lock);
773 
774 	/* session */
775 	rw_destroy(&sp->session_lock);
776 	rw_destroy(&sp->session_local_portmap_lock);
777 	rw_destroy(&sp->session_remote_portmap_lock);
778 
779 	/* free the session */
780 	kmem_free(sp, sizeof (rds_session_t));
781 
782 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
783 }
784 
785 /* This is called on the taskq thread */
786 static void
787 rds_failover_session(void *arg)
788 {
789 	rds_session_t	*sp = (rds_session_t *)arg;
790 	ib_gid_t	lgid, rgid;
791 	ipaddr_t	myip, remip;
792 	int		ret, cnt = 0;
793 
794 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
795 
796 	/* Make sure the session is still alive */
797 	if (rds_session_lkup_by_sp(sp) == B_FALSE) {
798 		RDS_DPRINTF2("rds_failover_session",
799 		    "Return: SP(%p) not ALIVE", sp);
800 		return;
801 	}
802 
803 	RDS_INCR_FAILOVERS();
804 
805 	rw_enter(&sp->session_lock, RW_WRITER);
806 	if (sp->session_type != RDS_SESSION_ACTIVE) {
807 		/*
808 		 * The remote side must have seen the error and initiated
809 		 * a re-connect.
810 		 */
811 		RDS_DPRINTF2("rds_failover_session",
812 		    "SP(%p) has become passive", sp);
813 		rw_exit(&sp->session_lock);
814 		return;
815 	}
816 	sp->session_failover = 1;
817 	rw_exit(&sp->session_lock);
818 
819 	/*
820 	 * The session is in ERROR state but close both channels
821 	 * for a clean start.
822 	 */
823 	rds_session_close(sp, IBT_BLOCKING, 1);
824 
825 	/* wait 1 sec before re-connecting */
826 	delay(drv_usectohz(1000000));
827 
828 	do {
829 		ibt_ip_path_attr_t	ipattr;
830 		ibt_ip_addr_t		dstip;
831 
832 		/* The ipaddr should be in the network order */
833 		myip = sp->session_myip;
834 		remip = sp->session_remip;
835 		ret = rds_sc_path_lookup(&myip, &remip);
836 		if (ret == 0) {
837 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
838 			    myip, remip);
839 		}
840 		/* check if we have (new) path from the source to destination */
841 		lgid.gid_prefix = 0;
842 		lgid.gid_guid = 0;
843 		rgid.gid_prefix = 0;
844 		rgid.gid_guid = 0;
845 
846 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
847 		dstip.family = AF_INET;
848 		dstip.un.ip4addr = htonl(remip);
849 		ipattr.ipa_dst_ip = &dstip;
850 		ipattr.ipa_src_ip.family = AF_INET;
851 		ipattr.ipa_src_ip.un.ip4addr = htonl(myip);
852 		ipattr.ipa_ndst = 1;
853 		ipattr.ipa_max_paths = 1;
854 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
855 		    myip, remip);
856 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
857 		    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo, NULL, NULL);
858 		if (ret == IBT_SUCCESS) {
859 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
860 			lgid = sp->session_pinfo.
861 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
862 			rgid = sp->session_pinfo.
863 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
864 			break;
865 		}
866 
867 		RDS_DPRINTF1(LABEL, "ibt_get_ip_paths failed, ret: %d ", ret);
868 
869 		/* wait 1 sec before re-trying */
870 		delay(drv_usectohz(1000000));
871 		cnt++;
872 	} while (cnt < 5);
873 
874 	if (ret != IBT_SUCCESS) {
875 		rw_enter(&sp->session_lock, RW_WRITER);
876 		if (sp->session_type == RDS_SESSION_ACTIVE) {
877 			rds_session_fini(sp);
878 			sp->session_state = RDS_SESSION_STATE_FAILED;
879 			sp->session_failover = 0;
880 			RDS_DPRINTF3("rds_failover_session",
881 			    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
882 		} else {
883 			RDS_DPRINTF2("rds_failover_session",
884 			    "SP(%p) has become passive", sp);
885 		}
886 		rw_exit(&sp->session_lock);
887 		return;
888 	}
889 
890 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
891 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
892 	    rgid.gid_guid);
893 
894 	rw_enter(&sp->session_lock, RW_WRITER);
895 	if (sp->session_type != RDS_SESSION_ACTIVE) {
896 		/*
897 		 * The remote side must have seen the error and initiated
898 		 * a re-connect.
899 		 */
900 		RDS_DPRINTF2("rds_failover_session",
901 		    "SP(%p) has become passive", sp);
902 		rw_exit(&sp->session_lock);
903 		return;
904 	}
905 
906 	/* move the session to init state */
907 	ret = rds_session_reinit(sp, lgid);
908 	sp->session_lgid = lgid;
909 	sp->session_rgid = rgid;
910 	if (ret != 0) {
911 		rds_session_fini(sp);
912 		sp->session_state = RDS_SESSION_STATE_FAILED;
913 		sp->session_failover = 0;
914 		RDS_DPRINTF3("rds_failover_session",
915 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
916 		rw_exit(&sp->session_lock);
917 		return;
918 	} else {
919 		sp->session_state = RDS_SESSION_STATE_INIT;
920 		RDS_DPRINTF3("rds_failover_session",
921 		    "SP(%p) State RDS_SESSION_STATE_INIT", sp);
922 	}
923 	rw_exit(&sp->session_lock);
924 
925 	rds_session_open(sp);
926 
927 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
928 }
929 
930 void
931 rds_handle_send_error(rds_ep_t *ep)
932 {
933 	if (rds_is_sendq_empty(ep, 0)) {
934 		/* Session should already be in ERROR, try to reconnect */
935 		RDS_DPRINTF2("rds_handle_send_error",
936 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
937 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
938 		    (void *)ep->ep_sp, DDI_SLEEP);
939 	}
940 }
941 
942 /*
943  * Called in the CM handler on the passive side
944  * Called on a taskq thread.
945  */
946 void
947 rds_cleanup_passive_session(void *arg)
948 {
949 	rds_session_t	*sp = arg;
950 
951 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
952 	    sp->session_state);
953 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
954 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
955 
956 	rds_session_close(sp, IBT_BLOCKING, 1);
957 
958 	rw_enter(&sp->session_lock, RW_WRITER);
959 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
960 		rds_session_fini(sp);
961 		sp->session_state = RDS_SESSION_STATE_FINI;
962 		sp->session_failover = 0;
963 		RDS_DPRINTF3("rds_cleanup_passive_session",
964 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
965 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
966 		rds_session_fini(sp);
967 		sp->session_state = RDS_SESSION_STATE_FAILED;
968 		sp->session_failover = 0;
969 		RDS_DPRINTF3("rds_cleanup_passive_session",
970 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
971 	}
972 	rw_exit(&sp->session_lock);
973 
974 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
975 }
976 
977 /*
978  * Called by the CM handler on the passive side
979  * Called with WRITE lock on the session
980  */
981 void
982 rds_passive_session_fini(rds_session_t *sp)
983 {
984 	rds_ep_t	*ep;
985 
986 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
987 	    sp->session_state);
988 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
989 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
990 
991 	/* clean the data channel */
992 	ep = &sp->session_dataep;
993 	(void) rds_is_sendq_empty(ep, 1);
994 	mutex_enter(&ep->ep_lock);
995 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
996 	    ep->ep_state);
997 	rds_ep_free_rc_channel(ep);
998 	mutex_exit(&ep->ep_lock);
999 
1000 	/* clean the control channel */
1001 	ep = &sp->session_ctrlep;
1002 	(void) rds_is_sendq_empty(ep, 1);
1003 	mutex_enter(&ep->ep_lock);
1004 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
1005 	    ep->ep_state);
1006 	rds_ep_free_rc_channel(ep);
1007 	mutex_exit(&ep->ep_lock);
1008 
1009 	rds_session_fini(sp);
1010 	sp->session_failover = 0;
1011 
1012 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
1013 }
1014 
1015 /*
1016  * Can be called:
1017  * 1. on driver detach
1018  * 2. on taskq thread
1019  * arg is always NULL
1020  */
1021 /* ARGSUSED */
1022 void
1023 rds_close_sessions(void *arg)
1024 {
1025 	rds_session_t *sp, *spnextp;
1026 
1027 	RDS_DPRINTF2("rds_close_sessions", "Enter");
1028 
1029 	/* wait until all the buffers are freed by the sockets */
1030 	while (RDS_GET_RXPKTS_PEND() != 0) {
1031 		/* wait one second and try again */
1032 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
1033 		    "pending packets", RDS_GET_RXPKTS_PEND());
1034 		delay(drv_usectohz(1000000));
1035 	}
1036 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
1037 
1038 	/* close all the sessions */
1039 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
1040 	sp = rdsib_statep->rds_sessionlistp;
1041 	while (sp) {
1042 		rw_enter(&sp->session_lock, RW_WRITER);
1043 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
1044 		    sp->session_state);
1045 
1046 		switch (sp->session_state) {
1047 		case RDS_SESSION_STATE_CONNECTED:
1048 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1049 			rw_exit(&sp->session_lock);
1050 
1051 			rds_session_close(sp, IBT_BLOCKING, 2);
1052 
1053 			rw_enter(&sp->session_lock, RW_WRITER);
1054 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1055 			RDS_DPRINTF3("rds_close_sessions",
1056 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1057 			rds_session_fini(sp);
1058 			sp->session_state = RDS_SESSION_STATE_FINI;
1059 			sp->session_failover = 0;
1060 			RDS_DPRINTF3("rds_close_sessions",
1061 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1062 			break;
1063 
1064 		case RDS_SESSION_STATE_ERROR:
1065 		case RDS_SESSION_STATE_PASSIVE_CLOSING:
1066 		case RDS_SESSION_STATE_INIT:
1067 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1068 			rw_exit(&sp->session_lock);
1069 
1070 			rds_session_close(sp, IBT_BLOCKING, 1);
1071 
1072 			rw_enter(&sp->session_lock, RW_WRITER);
1073 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1074 			RDS_DPRINTF3("rds_close_sessions",
1075 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1076 			/* FALLTHRU */
1077 		case RDS_SESSION_STATE_CLOSED:
1078 			rds_session_fini(sp);
1079 			sp->session_state = RDS_SESSION_STATE_FINI;
1080 			sp->session_failover = 0;
1081 			RDS_DPRINTF3("rds_close_sessions",
1082 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1083 			break;
1084 		}
1085 
1086 		rw_exit(&sp->session_lock);
1087 		sp = sp->session_nextp;
1088 	}
1089 
1090 	sp = rdsib_statep->rds_sessionlistp;
1091 	rdsib_statep->rds_sessionlistp = NULL;
1092 	rdsib_statep->rds_nsessions = 0;
1093 	rw_exit(&rdsib_statep->rds_sessionlock);
1094 
1095 	while (sp) {
1096 		spnextp = sp->session_nextp;
1097 		rds_destroy_session(sp);
1098 		RDS_DECR_SESS();
1099 		sp = spnextp;
1100 	}
1101 
1102 	/* free the global pool */
1103 	rds_free_recv_caches(rdsib_statep);
1104 
1105 	RDS_DPRINTF2("rds_close_sessions", "Return");
1106 }
1107 
1108 void
1109 rds_session_open(rds_session_t *sp)
1110 {
1111 	int		ret;
1112 
1113 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
1114 
1115 	ret = rds_session_connect(sp);
1116 	if (ret == -1) {
1117 		/*
1118 		 * may be the session has become passive due to
1119 		 * hitting peer-to-peer case
1120 		 */
1121 		rw_enter(&sp->session_lock, RW_READER);
1122 		if (sp->session_type == RDS_SESSION_PASSIVE) {
1123 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
1124 			    "has become passive from active", sp);
1125 			rw_exit(&sp->session_lock);
1126 			return;
1127 		}
1128 
1129 		/* get the lock for writing */
1130 		rw_exit(&sp->session_lock);
1131 		rw_enter(&sp->session_lock, RW_WRITER);
1132 		sp->session_state = RDS_SESSION_STATE_ERROR;
1133 		RDS_DPRINTF3("rds_session_open",
1134 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
1135 		rw_exit(&sp->session_lock);
1136 
1137 		/* Connect request failed */
1138 		rds_session_close(sp, IBT_BLOCKING, 1);
1139 
1140 		rw_enter(&sp->session_lock, RW_WRITER);
1141 		rds_session_fini(sp);
1142 		sp->session_state = RDS_SESSION_STATE_FAILED;
1143 		sp->session_failover = 0;
1144 		RDS_DPRINTF3("rds_session_open",
1145 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1146 		rw_exit(&sp->session_lock);
1147 
1148 		return;
1149 	}
1150 
1151 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
1152 }
1153 
1154 /*
1155  * Creates a session and inserts it into the list of sessions. The session
1156  * state would be CREATED.
1157  * Return Values:
1158  *	EWOULDBLOCK
1159  */
1160 rds_session_t *
1161 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
1162     ibt_cm_req_rcv_t *reqp, uint8_t type)
1163 {
1164 	ib_gid_t	lgid, rgid;
1165 	rds_session_t	*newp, *oldp;
1166 	rds_ep_t	*dataep, *ctrlep;
1167 	rds_bufpool_t	*pool;
1168 	int		ret;
1169 
1170 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x",
1171 	    statep, localip, remip);
1172 
1173 	/* Allocate and initialize global buffer pool */
1174 	ret = rds_init_recv_caches(statep);
1175 	if (ret != 0) {
1176 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
1177 		return (NULL);
1178 	}
1179 
1180 	/* enough memory for session (includes 2 endpoints) */
1181 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
1182 
1183 	newp->session_remip = remip;
1184 	newp->session_myip = localip;
1185 	newp->session_type = type;
1186 	newp->session_state = RDS_SESSION_STATE_CREATED;
1187 	RDS_DPRINTF3("rds_session_create",
1188 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
1189 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
1190 	rw_init(&newp->session_local_portmap_lock, NULL, RW_DRIVER, NULL);
1191 	rw_init(&newp->session_remote_portmap_lock, NULL, RW_DRIVER, NULL);
1192 
1193 	/* Initialize data endpoint */
1194 	dataep = &newp->session_dataep;
1195 	dataep->ep_remip = newp->session_remip;
1196 	dataep->ep_myip = newp->session_myip;
1197 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
1198 	dataep->ep_sp = newp;
1199 	dataep->ep_type = RDS_EP_TYPE_DATA;
1200 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1201 
1202 	/* Initialize send pool locks */
1203 	pool = &dataep->ep_sndpool;
1204 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1205 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1206 
1207 	/* Initialize recv pool locks */
1208 	pool = &dataep->ep_rcvpool;
1209 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1210 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1211 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1212 
1213 	/* Initialize control endpoint */
1214 	ctrlep = &newp->session_ctrlep;
1215 	ctrlep->ep_remip = newp->session_remip;
1216 	ctrlep->ep_myip = newp->session_myip;
1217 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
1218 	ctrlep->ep_sp = newp;
1219 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
1220 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1221 
1222 	/* Initialize send pool locks */
1223 	pool = &ctrlep->ep_sndpool;
1224 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1225 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1226 
1227 	/* Initialize recv pool locks */
1228 	pool = &ctrlep->ep_rcvpool;
1229 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1230 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1231 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1232 
1233 	/* lkup if there is already a session */
1234 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
1235 	oldp = rds_session_lkup(statep, remip, 0);
1236 	if (oldp != NULL) {
1237 		/* A session to this destination exists */
1238 		rw_exit(&statep->rds_sessionlock);
1239 		rw_destroy(&newp->session_lock);
1240 		rw_destroy(&newp->session_local_portmap_lock);
1241 		rw_destroy(&newp->session_remote_portmap_lock);
1242 		mutex_destroy(&dataep->ep_lock);
1243 		mutex_destroy(&ctrlep->ep_lock);
1244 		kmem_free(newp, sizeof (rds_session_t));
1245 		return (NULL);
1246 	}
1247 
1248 	/* Insert this session into the list */
1249 	rds_add_session(newp, B_TRUE);
1250 
1251 	/* unlock the session list */
1252 	rw_exit(&statep->rds_sessionlock);
1253 
1254 	if (type == RDS_SESSION_ACTIVE) {
1255 		ipaddr_t localip1, remip1;
1256 		ibt_ip_path_attr_t	ipattr;
1257 		ibt_ip_addr_t		dstip;
1258 
1259 		/* The ipaddr should be in the network order */
1260 		localip1 = localip;
1261 		remip1 = remip;
1262 		ret = rds_sc_path_lookup(&localip1, &remip1);
1263 		if (ret == 0) {
1264 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1265 			    localip, remip);
1266 		}
1267 
1268 		/* Get the gids for the source and destination ip addrs */
1269 		lgid.gid_prefix = 0;
1270 		lgid.gid_guid = 0;
1271 		rgid.gid_prefix = 0;
1272 		rgid.gid_guid = 0;
1273 
1274 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1275 		dstip.family = AF_INET;
1276 		dstip.un.ip4addr = ntohl(remip1);
1277 		ipattr.ipa_dst_ip = &dstip;
1278 		ipattr.ipa_src_ip.family = AF_INET;
1279 		ipattr.ipa_src_ip.un.ip4addr = ntohl(localip1);
1280 		ipattr.ipa_ndst = 1;
1281 		ipattr.ipa_max_paths = 1;
1282 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
1283 		    localip1, remip1);
1284 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
1285 		    IBT_PATH_NO_FLAGS, &ipattr, &newp->session_pinfo,
1286 		    NULL, NULL);
1287 		if (ret != IBT_SUCCESS) {
1288 			RDS_DPRINTF1(LABEL, "ibt_get_ip_paths failed, ret: %d "
1289 			    "lgid: %llx:%llx rgid: %llx:%llx", lgid.gid_prefix,
1290 			    lgid.gid_guid, rgid.gid_prefix, rgid.gid_guid);
1291 
1292 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1293 			return (NULL);
1294 		}
1295 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
1296 		lgid =
1297 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_sgid;
1298 		rgid =
1299 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_dgid;
1300 
1301 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1302 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1303 		    rgid.gid_guid);
1304 	}
1305 
1306 	rw_enter(&newp->session_lock, RW_WRITER);
1307 	/* check for peer-to-peer case */
1308 	if (type == newp->session_type) {
1309 		/* no peer-to-peer case */
1310 		if (type == RDS_SESSION_ACTIVE) {
1311 			newp->session_lgid = lgid;
1312 			newp->session_rgid = rgid;
1313 		} else {
1314 			/* rgid is requester gid & lgid is receiver gid */
1315 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1316 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1317 		}
1318 	}
1319 	rw_exit(&newp->session_lock);
1320 
1321 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1322 
1323 	return (newp);
1324 }
1325 
1326 void
1327 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1328 {
1329 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1330 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1331 
1332 	switch (cpkt->rcp_code) {
1333 	case RDS_CTRL_CODE_STALL:
1334 		RDS_INCR_STALLS_RCVD();
1335 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1336 		break;
1337 	case RDS_CTRL_CODE_UNSTALL:
1338 		RDS_INCR_UNSTALLS_RCVD();
1339 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1340 		break;
1341 	case RDS_CTRL_CODE_STALL_PORTS:
1342 		rds_mark_all_ports(sp, RDS_REMOTE);
1343 		break;
1344 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1345 		rds_unmark_all_ports(sp, RDS_REMOTE);
1346 		break;
1347 	case RDS_CTRL_CODE_HEARTBEAT:
1348 		break;
1349 	default:
1350 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1351 		    cpkt->rcp_code);
1352 		break;
1353 	}
1354 
1355 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1356 }
1357 
1358 int
1359 rds_post_control_message(rds_session_t *sp, uint8_t code, in_port_t port)
1360 {
1361 	ibt_send_wr_t	wr;
1362 	rds_ep_t	*ep;
1363 	rds_buf_t	*bp;
1364 	rds_ctrl_pkt_t	*cp;
1365 	int		ret;
1366 
1367 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1368 	    "Port: %d", sp, code, port);
1369 
1370 	ep = &sp->session_ctrlep;
1371 
1372 	bp = rds_get_send_buf(ep, 1);
1373 	if (bp == NULL) {
1374 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1375 		    "message: SP(%p) Code: %d Port: %d", sp, code,
1376 		    port);
1377 		return (-1);
1378 	}
1379 
1380 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1381 	cp->rcp_code = code;
1382 	cp->rcp_port = port;
1383 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1384 
1385 	wr.wr_id = (uintptr_t)bp;
1386 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1387 	wr.wr_trans = IBT_RC_SRV;
1388 	wr.wr_opcode = IBT_WRC_SEND;
1389 	wr.wr_nds = 1;
1390 	wr.wr_sgl = &bp->buf_ds;
1391 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1392 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1393 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1394 	if (ret != IBT_SUCCESS) {
1395 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1396 		    "%d", ep, ret);
1397 		bp->buf_state = RDS_SNDBUF_FREE;
1398 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1399 		return (-1);
1400 	}
1401 
1402 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1403 	    "Port: %d", sp, code, port);
1404 
1405 	return (0);
1406 }
1407 
1408 void
1409 rds_stall_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
1410 {
1411 	int		ret;
1412 
1413 	RDS_DPRINTF4("rds_stall_port", "Enter: SP(%p) Port %d", sp, port);
1414 
1415 	RDS_INCR_STALLS_TRIGGERED();
1416 
1417 	if (!rds_check_n_mark_port(sp, port, qualifier)) {
1418 
1419 		if (sp != NULL) {
1420 			ret = rds_post_control_message(sp,
1421 			    RDS_CTRL_CODE_STALL, port);
1422 			if (ret != 0) {
1423 				(void) rds_check_n_unmark_port(sp, port,
1424 				    qualifier);
1425 				return;
1426 			}
1427 			RDS_INCR_STALLS_SENT();
1428 		}
1429 	} else {
1430 		RDS_DPRINTF3(LABEL,
1431 		    "Port %d is already in stall state", port);
1432 	}
1433 
1434 	RDS_DPRINTF4("rds_stall_port", "Return: SP(%p) Port %d", sp, port);
1435 }
1436 
1437 void
1438 rds_resume_port(in_port_t port)
1439 {
1440 	rds_session_t	*sp;
1441 	uint_t		ix;
1442 	int		ret;
1443 
1444 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1445 
1446 	RDS_INCR_UNSTALLS_TRIGGERED();
1447 
1448 	/* resume loopback traffic */
1449 	(void) rds_check_n_unmark_port(NULL, port, RDS_LOOPBACK);
1450 
1451 	/* send unstall messages to resume the remote traffic */
1452 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1453 
1454 	sp = rdsib_statep->rds_sessionlistp;
1455 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1456 		ASSERT(sp != NULL);
1457 		if ((sp->session_state == RDS_SESSION_STATE_CONNECTED) &&
1458 		    (rds_check_n_unmark_port(sp, port, RDS_LOCAL))) {
1459 				ret = rds_post_control_message(sp,
1460 				    RDS_CTRL_CODE_UNSTALL, port);
1461 				if (ret != 0) {
1462 					(void) rds_check_n_mark_port(sp, port,
1463 					    RDS_LOCAL);
1464 				} else {
1465 					RDS_INCR_UNSTALLS_SENT();
1466 				}
1467 		}
1468 
1469 		sp = sp->session_nextp;
1470 	}
1471 
1472 	rw_exit(&rdsib_statep->rds_sessionlock);
1473 
1474 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1475 }
1476 
1477 static int
1478 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1479     in_port_t recvport)
1480 {
1481 	ibt_send_wr_t	*wrp, wr;
1482 	rds_buf_t	*bp, *bp1;
1483 	rds_data_hdr_t	*pktp;
1484 	uint32_t	msgsize, npkts, residual, pktno, ix;
1485 	int		ret;
1486 
1487 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1488 	    ep, uiop);
1489 
1490 	/* how many pkts are needed to carry this msg */
1491 	msgsize = uiop->uio_resid;
1492 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1493 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1494 
1495 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1496 	    msgsize, npkts);
1497 
1498 	/* Get the buffers needed to post this message */
1499 	bp = rds_get_send_buf(ep, npkts);
1500 	if (bp == NULL) {
1501 		RDS_INCR_ENOBUFS();
1502 		return (ENOBUFS);
1503 	}
1504 
1505 	if (npkts > 1) {
1506 		/*
1507 		 * multi-pkt messages are posted at the same time as a list
1508 		 * of WRs
1509 		 */
1510 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1511 		    npkts, KM_SLEEP);
1512 	}
1513 
1514 
1515 	pktno = 0;
1516 	bp1 = bp;
1517 	do {
1518 		/* prepare the header */
1519 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1520 		pktp->dh_datalen = UserBufferSize;
1521 		pktp->dh_npkts = npkts - pktno;
1522 		pktp->dh_psn = pktno;
1523 		pktp->dh_sendport = sendport;
1524 		pktp->dh_recvport = recvport;
1525 		bp1->buf_ds.ds_len = RdsPktSize;
1526 
1527 		/* copy the data */
1528 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1529 		    UserBufferSize, UIO_WRITE, uiop);
1530 		if (ret != 0) {
1531 			break;
1532 		}
1533 
1534 		if (uiop->uio_resid == 0) {
1535 			pktp->dh_datalen = residual;
1536 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1537 			break;
1538 		}
1539 		pktno++;
1540 		bp1 = bp1->buf_nextp;
1541 	} while (uiop->uio_resid);
1542 
1543 	if (ret) {
1544 		/* uiomove failed */
1545 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1546 		    uiop, ret);
1547 		if (npkts > 1) {
1548 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1549 		}
1550 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1551 		return (ret);
1552 	}
1553 
1554 	if (npkts > 1) {
1555 		/* multi-pkt message */
1556 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1557 
1558 		bp1 = bp;
1559 		for (ix = 0; ix < npkts; ix++) {
1560 			wrp[ix].wr_id = (uintptr_t)bp1;
1561 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1562 			wrp[ix].wr_trans = IBT_RC_SRV;
1563 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1564 			wrp[ix].wr_nds = 1;
1565 			wrp[ix].wr_sgl = &bp1->buf_ds;
1566 			bp1 = bp1->buf_nextp;
1567 		}
1568 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1569 
1570 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1571 		if (ret != IBT_SUCCESS) {
1572 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1573 			    "%d for %d pkts", ep, ret, npkts);
1574 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1575 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1576 			return (ret);
1577 		}
1578 
1579 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1580 	} else {
1581 		/* single pkt */
1582 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1583 		wr.wr_id = (uintptr_t)bp;
1584 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1585 		wr.wr_trans = IBT_RC_SRV;
1586 		wr.wr_opcode = IBT_WRC_SEND;
1587 		wr.wr_nds = 1;
1588 		wr.wr_sgl = &bp->buf_ds;
1589 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1590 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1591 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1592 		if (ret != IBT_SUCCESS) {
1593 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1594 			    "%d", ep, ret);
1595 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1596 			return (ret);
1597 		}
1598 	}
1599 
1600 	RDS_INCR_TXPKTS(npkts);
1601 	RDS_INCR_TXBYTES(msgsize);
1602 
1603 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1604 	    ep, uiop);
1605 
1606 	return (0);
1607 }
1608 
1609 static int
1610 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1611     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1612 {
1613 	mblk_t		*mp;
1614 	int		ret;
1615 
1616 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1617 
1618 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1619 	    "%d to recvport: %d", sendport, recvport);
1620 
1621 	mp = allocb(uiop->uio_resid, BPRI_MED);
1622 	if (mp == NULL) {
1623 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1624 		    uiop->uio_resid);
1625 		return (ENOSPC);
1626 	}
1627 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1628 
1629 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1630 	if (ret) {
1631 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1632 		freeb(mp);
1633 		return (ret);
1634 	}
1635 
1636 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1637 	    zoneid);
1638 	if (ret != 0) {
1639 		if (ret == ENOSPC) {
1640 			/*
1641 			 * The message is delivered but cannot take more,
1642 			 * stop further loopback traffic to this port
1643 			 */
1644 			RDS_DPRINTF3("rds_deliver_loopback_msg",
1645 			    "Port %d NO SPACE", recvport);
1646 			rds_stall_port(NULL, recvport, RDS_LOOPBACK);
1647 		} else {
1648 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1649 			    "port %d failed: %d", sendport, recvport, ret);
1650 			return (ret);
1651 		}
1652 	}
1653 
1654 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1655 	return (0);
1656 }
1657 
1658 static void
1659 rds_resend_messages(void *arg)
1660 {
1661 	rds_session_t	*sp = (rds_session_t *)arg;
1662 	rds_ep_t	*ep;
1663 	rds_bufpool_t	*spool;
1664 	rds_buf_t	*bp, *endp, *tmp;
1665 	ibt_send_wr_t	*wrp;
1666 	uint_t		nwr = 0, ix, jx;
1667 	int		ret;
1668 
1669 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1670 
1671 	ep = &sp->session_dataep;
1672 
1673 	spool = &ep->ep_sndpool;
1674 	mutex_enter(&spool->pool_lock);
1675 
1676 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1677 
1678 	if (ep->ep_lbufid == NULL) {
1679 		RDS_DPRINTF2("rds_resend_messages",
1680 		    "SP(%p) Remote session is cleaned up ", sp);
1681 		/*
1682 		 * The remote end cleaned up its session. There may be loss
1683 		 * of messages. Mark all buffers as acknowledged.
1684 		 */
1685 		tmp = spool->pool_tailp;
1686 	} else {
1687 		tmp = (rds_buf_t *)ep->ep_lbufid;
1688 		RDS_DPRINTF2("rds_resend_messages",
1689 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1690 	}
1691 
1692 	endp = spool->pool_tailp;
1693 	bp = spool->pool_headp;
1694 	jx = 0;
1695 	while ((bp != NULL) && (bp != tmp)) {
1696 		bp->buf_state = RDS_SNDBUF_FREE;
1697 		jx++;
1698 		bp = bp->buf_nextp;
1699 	}
1700 
1701 	if (bp == NULL) {
1702 		mutex_exit(&spool->pool_lock);
1703 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1704 		    "found in the list", tmp);
1705 
1706 		rw_enter(&sp->session_lock, RW_WRITER);
1707 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1708 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1709 		} else {
1710 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1711 			    "Expected State: %d", sp, sp->session_state,
1712 			    RDS_SESSION_STATE_CONNECTED);
1713 		}
1714 		sp->session_failover = 0;
1715 		rw_exit(&sp->session_lock);
1716 		return;
1717 	}
1718 
1719 	/* Found the match */
1720 	bp->buf_state = RDS_SNDBUF_FREE;
1721 	jx++;
1722 
1723 	spool->pool_tailp = bp;
1724 	bp = bp->buf_nextp;
1725 	spool->pool_tailp->buf_nextp = NULL;
1726 	nwr = spool->pool_nfree - jx;
1727 	spool->pool_nfree = jx;
1728 	mutex_exit(&spool->pool_lock);
1729 
1730 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1731 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1732 
1733 	if (bp) {
1734 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1735 		    KM_SLEEP);
1736 
1737 		while (nwr) {
1738 			jx = (nwr > 100) ? 100 : nwr;
1739 
1740 			tmp = bp;
1741 			for (ix = 0; ix < jx; ix++) {
1742 				bp->buf_state = RDS_SNDBUF_PENDING;
1743 				wrp[ix].wr_id = (uintptr_t)bp;
1744 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1745 				wrp[ix].wr_trans = IBT_RC_SRV;
1746 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1747 				wrp[ix].wr_nds = 1;
1748 				wrp[ix].wr_sgl = &bp->buf_ds;
1749 				bp = bp->buf_nextp;
1750 			}
1751 
1752 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1753 			if (ret != IBT_SUCCESS) {
1754 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1755 				    "failed: %d for % pkts", ep, ret, jx);
1756 				break;
1757 			}
1758 
1759 			mutex_enter(&spool->pool_lock);
1760 			spool->pool_nbusy += jx;
1761 			mutex_exit(&spool->pool_lock);
1762 
1763 			nwr -= jx;
1764 		}
1765 
1766 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1767 
1768 		if (nwr != 0) {
1769 
1770 			/*
1771 			 * An error while failover is in progress. Some WRs are
1772 			 * posted while other remain. If any of the posted WRs
1773 			 * complete in error then they would dispatch a taskq to
1774 			 * do a failover. Getting the session lock will prevent
1775 			 * the taskq to wait until we are done here.
1776 			 */
1777 			rw_enter(&sp->session_lock, RW_READER);
1778 
1779 			/*
1780 			 * Wait until all the previous WRs are completed and
1781 			 * then queue the remaining, otherwise the order of
1782 			 * the messages may change.
1783 			 */
1784 			(void) rds_is_sendq_empty(ep, 1);
1785 
1786 			/* free the remaining buffers */
1787 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1788 
1789 			rw_exit(&sp->session_lock);
1790 			return;
1791 		}
1792 	}
1793 
1794 	rw_enter(&sp->session_lock, RW_WRITER);
1795 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1796 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1797 	} else {
1798 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1799 		    "Expected State: %d", sp, sp->session_state,
1800 		    RDS_SESSION_STATE_CONNECTED);
1801 	}
1802 	sp->session_failover = 0;
1803 	rw_exit(&sp->session_lock);
1804 
1805 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1806 }
1807 
1808 /*
1809  * This is called when a channel is connected. Transition the session to
1810  * CONNECTED state iff both channels are connected.
1811  */
1812 void
1813 rds_session_active(rds_session_t *sp)
1814 {
1815 	rds_ep_t	*ep;
1816 	uint_t		failover;
1817 
1818 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1819 
1820 	rw_enter(&sp->session_lock, RW_READER);
1821 
1822 	failover = sp->session_failover;
1823 
1824 	/*
1825 	 * we establish the data channel first, so check the control channel
1826 	 * first but make sure it is initialized.
1827 	 */
1828 	ep = &sp->session_ctrlep;
1829 	mutex_enter(&ep->ep_lock);
1830 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1831 		/* the session is not ready yet */
1832 		mutex_exit(&ep->ep_lock);
1833 		rw_exit(&sp->session_lock);
1834 		return;
1835 	}
1836 	mutex_exit(&ep->ep_lock);
1837 
1838 	/* control channel is connected, check the data channel */
1839 	ep = &sp->session_dataep;
1840 	mutex_enter(&ep->ep_lock);
1841 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1842 		/* data channel is not yet connected */
1843 		mutex_exit(&ep->ep_lock);
1844 		rw_exit(&sp->session_lock);
1845 		return;
1846 	}
1847 	mutex_exit(&ep->ep_lock);
1848 
1849 	if (failover) {
1850 		rw_exit(&sp->session_lock);
1851 
1852 		/*
1853 		 * The session has failed over. Previous msgs have to be
1854 		 * re-sent before the session is moved to the connected
1855 		 * state.
1856 		 */
1857 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1858 		    "to re-send messages", sp);
1859 		(void) ddi_taskq_dispatch(rds_taskq,
1860 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1861 		return;
1862 	}
1863 
1864 	/* the session is ready */
1865 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1866 	RDS_DPRINTF3("rds_session_active",
1867 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1868 
1869 	rw_exit(&sp->session_lock);
1870 
1871 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1872 }
1873 
1874 static int
1875 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1876     in_port_t recvport)
1877 {
1878 	int	ret;
1879 
1880 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1881 	    "%d", ep, sendport, recvport);
1882 
1883 	/* make sure the remote port is not stalled */
1884 	if (rds_is_port_marked(ep->ep_sp, recvport, RDS_REMOTE)) {
1885 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1886 		    ep->ep_sp, recvport);
1887 		RDS_INCR_EWOULDBLOCK();
1888 		ret = ENOMEM;
1889 	} else {
1890 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1891 	}
1892 
1893 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1894 
1895 	return (ret);
1896 }
1897 
1898 /* Send a message to a destination socket */
1899 int
1900 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1901     in_port_t recvport, zoneid_t zoneid)
1902 {
1903 	rds_session_t	*sp;
1904 	ib_gid_t	lgid, rgid;
1905 	int		ret;
1906 
1907 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1908 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1909 	    sendport, recvport);
1910 
1911 	/* If msg length is 0, just return success */
1912 	if (uiop->uio_resid == 0) {
1913 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
1914 		return (0);
1915 	}
1916 
1917 	/* Is there a session to the destination? */
1918 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1919 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
1920 	rw_exit(&rdsib_statep->rds_sessionlock);
1921 
1922 	/* Is this a loopback message? */
1923 	if ((sp == NULL) && (rds_islocal(recvip))) {
1924 		/* make sure the port is not stalled */
1925 		if (rds_is_port_marked(NULL, recvport, RDS_LOOPBACK)) {
1926 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
1927 			    recvport);
1928 			RDS_INCR_EWOULDBLOCK();
1929 			return (ENOMEM);
1930 		}
1931 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
1932 		    sendport, zoneid);
1933 		return (ret);
1934 	}
1935 
1936 	/* Not a loopback message */
1937 	if (sp == NULL) {
1938 		/* There is no session to the destination, create one. */
1939 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
1940 		    "IP: 0x%x", recvip);
1941 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
1942 		    RDS_SESSION_ACTIVE);
1943 		if (sp != NULL) {
1944 			rw_enter(&sp->session_lock, RW_WRITER);
1945 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1946 				ret = rds_session_init(sp);
1947 				if (ret != 0) {
1948 					RDS_DPRINTF2("rds_sendmsg",
1949 					    "SP(%p): rds_session_init failed",
1950 					    sp);
1951 					sp->session_state =
1952 					    RDS_SESSION_STATE_FAILED;
1953 					RDS_DPRINTF3("rds_sendmsg",
1954 					    "SP(%p) State "
1955 					    "RDS_SESSION_STATE_FAILED", sp);
1956 					rw_exit(&sp->session_lock);
1957 					return (EFAULT);
1958 				}
1959 				sp->session_state = RDS_SESSION_STATE_INIT;
1960 				RDS_DPRINTF3("rds_sendmsg",
1961 				    "SP(%p) State "
1962 				    "RDS_SESSION_STATE_INIT", sp);
1963 				rw_exit(&sp->session_lock);
1964 				rds_session_open(sp);
1965 			} else {
1966 				rw_exit(&sp->session_lock);
1967 			}
1968 		} else {
1969 			/* Is a session created for this destination */
1970 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1971 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
1972 			rw_exit(&rdsib_statep->rds_sessionlock);
1973 			if (sp == NULL) {
1974 				return (EFAULT);
1975 			}
1976 		}
1977 	}
1978 
1979 	/* There is a session to the destination */
1980 	rw_enter(&sp->session_lock, RW_READER);
1981 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
1982 		rw_exit(&sp->session_lock);
1983 
1984 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
1985 		    recvport);
1986 		return (ret);
1987 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
1988 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
1989 		ipaddr_t sendip1, recvip1;
1990 
1991 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
1992 		    "%d", sp);
1993 		rw_exit(&sp->session_lock);
1994 		rw_enter(&sp->session_lock, RW_WRITER);
1995 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
1996 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
1997 			ibt_ip_path_attr_t	ipattr;
1998 			ibt_ip_addr_t		dstip;
1999 
2000 			sp->session_state = RDS_SESSION_STATE_CREATED;
2001 			sp->session_type = RDS_SESSION_ACTIVE;
2002 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
2003 			    "RDS_SESSION_STATE_CREATED", sp);
2004 			rw_exit(&sp->session_lock);
2005 
2006 
2007 			/* The ipaddr should be in the network order */
2008 			sendip1 = sendip;
2009 			recvip1 = recvip;
2010 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
2011 			if (ret == 0) {
2012 				RDS_DPRINTF2(LABEL, "Path not found "
2013 				    "(0x%x 0x%x)", sendip1, recvip1);
2014 			}
2015 
2016 			/* Resolve the IP addresses */
2017 			lgid.gid_prefix = 0;
2018 			lgid.gid_guid = 0;
2019 			rgid.gid_prefix = 0;
2020 			rgid.gid_guid = 0;
2021 
2022 			bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
2023 			dstip.family = AF_INET;
2024 			dstip.un.ip4addr = htonl(recvip1);
2025 			ipattr.ipa_dst_ip = &dstip;
2026 			ipattr.ipa_src_ip.family = AF_INET;
2027 			ipattr.ipa_src_ip.un.ip4addr = htonl(sendip1);
2028 			ipattr.ipa_ndst = 1;
2029 			ipattr.ipa_max_paths = 1;
2030 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
2031 			    sendip1, recvip1);
2032 			ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
2033 			    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo,
2034 			    NULL, NULL);
2035 			if (ret != IBT_SUCCESS) {
2036 				RDS_DPRINTF1("rds_sendmsg",
2037 				    "ibt_get_ip_paths failed, ret: %d ", ret);
2038 
2039 				rw_enter(&sp->session_lock, RW_WRITER);
2040 				if (sp->session_type == RDS_SESSION_ACTIVE) {
2041 					sp->session_state =
2042 					    RDS_SESSION_STATE_FAILED;
2043 					RDS_DPRINTF3("rds_sendmsg",
2044 					    "SP(%p) State "
2045 					    "RDS_SESSION_STATE_FAILED", sp);
2046 					rw_exit(&sp->session_lock);
2047 					return (EFAULT);
2048 				} else {
2049 					rw_exit(&sp->session_lock);
2050 					return (ENOMEM);
2051 				}
2052 			}
2053 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
2054 			lgid = sp->session_pinfo.
2055 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
2056 			rgid = sp->session_pinfo.
2057 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
2058 
2059 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
2060 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
2061 			    rgid.gid_guid);
2062 
2063 			rw_enter(&sp->session_lock, RW_WRITER);
2064 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2065 				sp->session_lgid = lgid;
2066 				sp->session_rgid = rgid;
2067 				ret = rds_session_init(sp);
2068 				if (ret != 0) {
2069 					RDS_DPRINTF2("rds_sendmsg",
2070 					    "SP(%p): rds_session_init failed",
2071 					    sp);
2072 					sp->session_state =
2073 					    RDS_SESSION_STATE_FAILED;
2074 					RDS_DPRINTF3("rds_sendmsg",
2075 					    "SP(%p) State "
2076 					    "RDS_SESSION_STATE_FAILED", sp);
2077 					rw_exit(&sp->session_lock);
2078 					return (EFAULT);
2079 				}
2080 				sp->session_state = RDS_SESSION_STATE_INIT;
2081 				rw_exit(&sp->session_lock);
2082 
2083 				rds_session_open(sp);
2084 
2085 			} else {
2086 				RDS_DPRINTF2("rds_sendmsg",
2087 				    "SP(%p): type changed to %d",
2088 				    sp, sp->session_type);
2089 				rw_exit(&sp->session_lock);
2090 				return (ENOMEM);
2091 			}
2092 		} else {
2093 			RDS_DPRINTF2("rds_sendmsg",
2094 			    "SP(%p): Session state %d changed",
2095 			    sp, sp->session_state);
2096 			rw_exit(&sp->session_lock);
2097 			return (ENOMEM);
2098 		}
2099 	} else {
2100 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): Session is in %d state",
2101 		    sp, sp->session_state);
2102 		rw_exit(&sp->session_lock);
2103 		return (ENOMEM);
2104 	}
2105 
2106 	rw_enter(&sp->session_lock, RW_READER);
2107 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2108 		rw_exit(&sp->session_lock);
2109 
2110 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2111 		    recvport);
2112 	} else {
2113 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): state(%d) not connected",
2114 		    sp, sp->session_state);
2115 		rw_exit(&sp->session_lock);
2116 	}
2117 
2118 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
2119 
2120 	return (ret);
2121 }
2122 
2123 /* Note: This is called on the CQ handler thread */
2124 void
2125 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
2126 {
2127 	mblk_t		*mp, *mp1;
2128 	rds_data_hdr_t	*pktp, *pktp1;
2129 	uint8_t		*datap;
2130 	rds_buf_t	*bp1;
2131 	rds_bufpool_t	*rpool;
2132 	uint_t		npkts, ix;
2133 	int		ret;
2134 
2135 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
2136 
2137 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
2138 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
2139 	npkts = pktp->dh_npkts;
2140 
2141 	/* increment rx pending here */
2142 	rpool = &ep->ep_rcvpool;
2143 	mutex_enter(&rpool->pool_lock);
2144 	rpool->pool_nbusy += npkts;
2145 	mutex_exit(&rpool->pool_lock);
2146 
2147 	/* this will get freed by sockfs */
2148 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
2149 	if (mp == NULL) {
2150 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2151 		    ep, bp);
2152 		rds_free_recv_buf(bp, npkts);
2153 		return;
2154 	}
2155 	mp->b_wptr = datap + pktp->dh_datalen;
2156 	mp->b_datap->db_type = M_DATA;
2157 
2158 	mp1 = mp;
2159 	bp1 = bp->buf_nextp;
2160 	while (bp1 != NULL) {
2161 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
2162 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
2163 		    RDS_DATA_HDR_SZ;
2164 
2165 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
2166 		    BPRI_HI, &bp1->buf_frtn);
2167 		if (mp1->b_cont == NULL) {
2168 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2169 			    ep, bp1);
2170 			freemsg(mp);
2171 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
2172 			return;
2173 		}
2174 		mp1 = mp1->b_cont;
2175 		mp1->b_wptr = datap + pktp1->dh_datalen;
2176 		mp1->b_datap->db_type = M_DATA;
2177 
2178 		bp1 = bp1->buf_nextp;
2179 	}
2180 
2181 	RDS_INCR_RXPKTS_PEND(npkts);
2182 	RDS_INCR_RXPKTS(npkts);
2183 	RDS_INCR_RXBYTES(msgdsize(mp));
2184 
2185 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
2186 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
2187 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
2188 	    npkts, pktp->dh_psn);
2189 
2190 	/* store the last buffer id, no lock needed */
2191 	if (npkts > 1) {
2192 		ep->ep_rbufid = pktp1->dh_bufid;
2193 	} else {
2194 		ep->ep_rbufid = pktp->dh_bufid;
2195 	}
2196 
2197 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
2198 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
2199 	if (ret != 0) {
2200 		if (ret == ENOSPC) {
2201 			/*
2202 			 * The message is delivered but cannot take more,
2203 			 * stop further remote messages coming to this port
2204 			 */
2205 			RDS_DPRINTF3("rds_received_msg", "Port %d NO SPACE",
2206 			    pktp->dh_recvport);
2207 			rds_stall_port(ep->ep_sp, pktp->dh_recvport, RDS_LOCAL);
2208 		} else {
2209 			RDS_DPRINTF1(LABEL, "rds_deliver_new_msg returned: %d",
2210 			    ret);
2211 		}
2212 	}
2213 
2214 	mutex_enter(&ep->ep_lock);
2215 	/* ep_chanhdl can be null if conn est hasn't come yet */
2216 	if ((ep->ep_rdmacnt == 0) && (ep->ep_chanhdl != NULL)) {
2217 		ep->ep_rdmacnt++;
2218 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
2219 		mutex_exit(&ep->ep_lock);
2220 
2221 		/* send acknowledgement */
2222 		RDS_INCR_TXACKS();
2223 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
2224 		if (ret != IBT_SUCCESS) {
2225 			RDS_DPRINTF1(LABEL, "EP(%p): ibt_post_send for "
2226 			    "acknowledgement failed: %d, SQ depth: %d",
2227 			    ep, ret, ep->ep_sndpool.pool_nbusy);
2228 			mutex_enter(&ep->ep_lock);
2229 			ep->ep_rdmacnt--;
2230 			mutex_exit(&ep->ep_lock);
2231 		}
2232 	} else {
2233 		/* no room to send acknowledgement */
2234 		mutex_exit(&ep->ep_lock);
2235 	}
2236 
2237 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
2238 }
2239