xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rds/rdsib_ep.c (revision 437220cd296f6d8b6654d6d52508b40b1e2d1ac7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *	- Redistributions of source code must retain the above
39  *	  copyright notice, this list of conditions and the following
40  *	  disclaimer.
41  *
42  *	- Redistributions in binary form must reproduce the above
43  *	  copyright notice, this list of conditions and the following
44  *	  disclaimer in the documentation and/or other materials
45  *	  provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 /*
58  * Sun elects to include this software in Sun product
59  * under the OpenIB BSD license.
60  *
61  *
62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
72  * POSSIBILITY OF SUCH DAMAGE.
73  */
74 
75 #pragma ident	"%Z%%M%	%I%	%E% SMI"
76 
77 #include <sys/stream.h>
78 #include <sys/ib/clients/rds/rdsib_cm.h>
79 #include <sys/ib/clients/rds/rdsib_ib.h>
80 #include <sys/ib/clients/rds/rdsib_buf.h>
81 #include <sys/ib/clients/rds/rdsib_ep.h>
82 #include <sys/ib/clients/rds/rds_kstat.h>
83 #include <sys/zone.h>
84 
85 #define	RDS_POLL_CQ_IN_2TICKS	1
86 
87 /*
88  * This File contains the endpoint related calls
89  */
90 
91 extern boolean_t rds_islocal(ipaddr_t addr);
92 extern uint_t rds_wc_signal;
93 
94 #define	RDS_LOOPBACK	0
95 #define	RDS_LOCAL	1
96 #define	RDS_REMOTE	2
97 
98 #define	IBT_IPADDR	1
99 
100 static uint8_t
101 rds_is_port_marked(rds_session_t *sp, in_port_t port, uint_t qualifier)
102 {
103 	uint8_t	ret;
104 
105 	switch (qualifier) {
106 	case RDS_LOOPBACK: /* loopback */
107 		rw_enter(&rds_loopback_portmap_lock, RW_READER);
108 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
109 		rw_exit(&rds_loopback_portmap_lock);
110 		break;
111 
112 	case RDS_LOCAL: /* Session local */
113 		ASSERT(sp != NULL);
114 		rw_enter(&sp->session_local_portmap_lock, RW_READER);
115 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
116 		rw_exit(&sp->session_local_portmap_lock);
117 		break;
118 
119 	case RDS_REMOTE: /* Session remote */
120 		ASSERT(sp != NULL);
121 		rw_enter(&sp->session_remote_portmap_lock, RW_READER);
122 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
123 		rw_exit(&sp->session_remote_portmap_lock);
124 		break;
125 	}
126 
127 	return (ret);
128 }
129 
130 static uint8_t
131 rds_check_n_mark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
132 {
133 	uint8_t	ret;
134 
135 	switch (qualifier) {
136 	case RDS_LOOPBACK: /* loopback */
137 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
138 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
139 		if (!ret) {
140 			/* port is not marked, mark it */
141 			rds_loopback_portmap[port/8] =
142 			    rds_loopback_portmap[port/8] | (1 << (port % 8));
143 		}
144 		rw_exit(&rds_loopback_portmap_lock);
145 		break;
146 
147 	case RDS_LOCAL: /* Session local */
148 		ASSERT(sp != NULL);
149 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
150 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
151 		if (!ret) {
152 			/* port is not marked, mark it */
153 			sp->session_local_portmap[port/8] =
154 			    sp->session_local_portmap[port/8] |
155 			    (1 << (port % 8));
156 		}
157 		rw_exit(&sp->session_local_portmap_lock);
158 		break;
159 
160 	case RDS_REMOTE: /* Session remote */
161 		ASSERT(sp != NULL);
162 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
163 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
164 		if (!ret) {
165 			/* port is not marked, mark it */
166 			sp->session_remote_portmap[port/8] =
167 			    sp->session_remote_portmap[port/8] |
168 			    (1 << (port % 8));
169 		}
170 		rw_exit(&sp->session_remote_portmap_lock);
171 		break;
172 	}
173 
174 	return (ret);
175 }
176 
177 static uint8_t
178 rds_check_n_unmark_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
179 {
180 	uint8_t	ret;
181 
182 	switch (qualifier) {
183 	case RDS_LOOPBACK: /* loopback */
184 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
185 		ret = (rds_loopback_portmap[port/8] & (1 << (port % 8)));
186 		if (ret) {
187 			/* port is marked, unmark it */
188 			rds_loopback_portmap[port/8] =
189 			    rds_loopback_portmap[port/8] & ~(1 << (port % 8));
190 		}
191 		rw_exit(&rds_loopback_portmap_lock);
192 		break;
193 
194 	case RDS_LOCAL: /* Session local */
195 		ASSERT(sp != NULL);
196 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
197 		ret = (sp->session_local_portmap[port/8] & (1 << (port % 8)));
198 		if (ret) {
199 			/* port is marked, unmark it */
200 			sp->session_local_portmap[port/8] =
201 			    sp->session_local_portmap[port/8] &
202 			    ~(1 << (port % 8));
203 		}
204 		rw_exit(&sp->session_local_portmap_lock);
205 		break;
206 
207 	case RDS_REMOTE: /* Session remote */
208 		ASSERT(sp != NULL);
209 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
210 		ret = (sp->session_remote_portmap[port/8] & (1 << (port % 8)));
211 		if (ret) {
212 			/* port is marked, unmark it */
213 			sp->session_remote_portmap[port/8] =
214 			    sp->session_remote_portmap[port/8] &
215 			    ~(1 << (port % 8));
216 		}
217 		rw_exit(&sp->session_remote_portmap_lock);
218 		break;
219 	}
220 
221 	return (ret);
222 }
223 
224 static void
225 rds_mark_all_ports(rds_session_t *sp, uint_t qualifier)
226 {
227 	switch (qualifier) {
228 	case RDS_LOOPBACK: /* loopback */
229 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
230 		(void) memset(rds_loopback_portmap, 0xFF, RDS_PORT_MAP_SIZE);
231 		rw_exit(&rds_loopback_portmap_lock);
232 		break;
233 
234 	case RDS_LOCAL: /* Session local */
235 		ASSERT(sp != NULL);
236 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
237 		(void) memset(sp->session_local_portmap, 0xFF,
238 		    RDS_PORT_MAP_SIZE);
239 		rw_exit(&sp->session_local_portmap_lock);
240 		break;
241 
242 	case RDS_REMOTE: /* Session remote */
243 		ASSERT(sp != NULL);
244 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
245 		(void) memset(sp->session_remote_portmap, 0xFF,
246 		    RDS_PORT_MAP_SIZE);
247 		rw_exit(&sp->session_remote_portmap_lock);
248 		break;
249 	}
250 }
251 
252 static void
253 rds_unmark_all_ports(rds_session_t *sp, uint_t qualifier)
254 {
255 	switch (qualifier) {
256 	case RDS_LOOPBACK: /* loopback */
257 		rw_enter(&rds_loopback_portmap_lock, RW_WRITER);
258 		bzero(rds_loopback_portmap, RDS_PORT_MAP_SIZE);
259 		rw_exit(&rds_loopback_portmap_lock);
260 		break;
261 
262 	case RDS_LOCAL: /* Session local */
263 		ASSERT(sp != NULL);
264 		rw_enter(&sp->session_local_portmap_lock, RW_WRITER);
265 		bzero(sp->session_local_portmap, RDS_PORT_MAP_SIZE);
266 		rw_exit(&sp->session_local_portmap_lock);
267 		break;
268 
269 	case RDS_REMOTE: /* Session remote */
270 		ASSERT(sp != NULL);
271 		rw_enter(&sp->session_remote_portmap_lock, RW_WRITER);
272 		bzero(sp->session_remote_portmap, RDS_PORT_MAP_SIZE);
273 		rw_exit(&sp->session_remote_portmap_lock);
274 		break;
275 	}
276 }
277 
278 static void
279 rds_add_session(rds_session_t *sp, boolean_t locked)
280 {
281 	RDS_DPRINTF2("rds_add_session", "Enter: SP(%p)", sp);
282 
283 	if (!locked) {
284 		rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
285 	}
286 
287 	sp->session_nextp = rdsib_statep->rds_sessionlistp;
288 	rdsib_statep->rds_sessionlistp = sp;
289 	rdsib_statep->rds_nsessions++;
290 
291 	if (!locked) {
292 		rw_exit(&rdsib_statep->rds_sessionlock);
293 	}
294 	RDS_INCR_SESS();
295 
296 	RDS_DPRINTF2("rds_add_session", "Return: SP(%p)", sp);
297 }
298 
299 /* Session lookup based on destination IP or destination node guid */
300 rds_session_t *
301 rds_session_lkup(rds_state_t *statep, ipaddr_t remoteip, ib_guid_t node_guid)
302 {
303 	rds_session_t	*sp;
304 
305 	RDS_DPRINTF4("rds_session_lkup", "Enter: 0x%p 0x%x 0x%llx", statep,
306 	    remoteip, node_guid);
307 
308 	/* A read/write lock is expected, will panic if none of them are held */
309 	ASSERT(rw_lock_held(&statep->rds_sessionlock));
310 	sp = statep->rds_sessionlistp;
311 	while (sp) {
312 		if ((sp->session_remip == remoteip) || ((node_guid != 0) &&
313 		    (sp->session_rgid.gid_guid == node_guid))) {
314 			break;
315 		}
316 
317 		sp = sp->session_nextp;
318 	}
319 
320 	RDS_DPRINTF4("rds_session_lkup", "Return: SP(%p)", sp);
321 
322 	return (sp);
323 }
324 
325 static void
326 rds_ep_fini(rds_ep_t *ep)
327 {
328 	RDS_DPRINTF3("rds_ep_fini", "Enter: EP(%p) type: %d", ep, ep->ep_type);
329 
330 	/* free send pool */
331 	rds_free_send_pool(ep);
332 
333 	/* free recv pool */
334 	rds_free_recv_pool(ep);
335 
336 	mutex_enter(&ep->ep_lock);
337 	ep->ep_hca_guid = 0;
338 	mutex_exit(&ep->ep_lock);
339 
340 	RDS_DPRINTF3("rds_ep_fini", "Return EP(%p)", ep);
341 }
342 
343 /* Assumes SP write lock is held */
344 int
345 rds_ep_init(rds_ep_t *ep, ib_guid_t hca_guid)
346 {
347 	uint_t		ret;
348 
349 	RDS_DPRINTF3("rds_ep_init", "Enter: EP(%p) Type: %d", ep, ep->ep_type);
350 
351 	/* send pool */
352 	ret = rds_init_send_pool(ep, hca_guid);
353 	if (ret != 0) {
354 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_send_pool failed: %d",
355 		    ep, ret);
356 		return (-1);
357 	}
358 
359 	/* recv pool */
360 	ret = rds_init_recv_pool(ep);
361 	if (ret != 0) {
362 		RDS_DPRINTF2(LABEL, "EP(%p): rds_init_recv_pool failed: %d",
363 		    ep, ret);
364 		rds_free_send_pool(ep);
365 		return (-1);
366 	}
367 
368 	/* reset the ep state */
369 	mutex_enter(&ep->ep_lock);
370 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
371 	ep->ep_hca_guid = hca_guid;
372 	ep->ep_lbufid = NULL;
373 	ep->ep_rbufid = NULL;
374 	ep->ep_segfbp = NULL;
375 	ep->ep_seglbp = NULL;
376 
377 	/* Initialize the WR to send acknowledgements */
378 	ep->ep_ackwr.wr_id = RDS_RDMAW_WRID;
379 	ep->ep_ackwr.wr_flags = IBT_WR_SEND_SOLICIT;
380 	ep->ep_ackwr.wr_trans = IBT_RC_SRV;
381 	ep->ep_ackwr.wr_opcode = IBT_WRC_RDMAW;
382 	ep->ep_ackwr.wr_nds = 1;
383 	ep->ep_ackwr.wr_sgl = &ep->ep_ackds;
384 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_raddr = NULL;
385 	ep->ep_ackwr.wr.rc.rcwr.rdma.rdma_rkey = 0;
386 	mutex_exit(&ep->ep_lock);
387 
388 	RDS_DPRINTF3("rds_ep_init", "Return: EP(%p) type: %d", ep, ep->ep_type);
389 
390 	return (0);
391 }
392 
393 static int
394 rds_ep_reinit(rds_ep_t *ep, ib_guid_t hca_guid)
395 {
396 	int	ret;
397 
398 	RDS_DPRINTF3("rds_ep_reinit", "Enter: EP(%p) Type: %d",
399 	    ep, ep->ep_type);
400 
401 	/* Re-initialize send pool */
402 	ret = rds_reinit_send_pool(ep, hca_guid);
403 	if (ret != 0) {
404 		RDS_DPRINTF2("rds_ep_reinit",
405 		    "EP(%p): rds_reinit_send_pool failed: %d", ep, ret);
406 		return (-1);
407 	}
408 
409 	/* free all the receive buffers in the pool */
410 	rds_free_recv_pool(ep);
411 
412 	RDS_DPRINTF3("rds_ep_reinit", "Return: EP(%p) Type: %d",
413 	    ep, ep->ep_type);
414 
415 	return (0);
416 }
417 
418 void
419 rds_session_fini(rds_session_t *sp)
420 {
421 	RDS_DPRINTF2("rds_session_fini", "Enter: SP(0x%p)", sp);
422 
423 	rds_ep_fini(&sp->session_dataep);
424 	rds_ep_fini(&sp->session_ctrlep);
425 
426 	RDS_DPRINTF2("rds_session_fini", "Return: SP(0x%p)", sp);
427 }
428 
429 /*
430  * Allocate and initialize the resources needed for the control and
431  * data channels
432  */
433 int
434 rds_session_init(rds_session_t *sp)
435 {
436 	int		ret;
437 	rds_hca_t	*hcap;
438 	ib_guid_t	hca_guid;
439 
440 	RDS_DPRINTF2("rds_session_init", "Enter: SP(0x%p)", sp);
441 
442 	/* CALLED WITH SESSION WRITE LOCK */
443 
444 	hcap = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
445 	if (hcap == NULL) {
446 		RDS_DPRINTF1("rds_session_init", "SGID is on an uninitialized "
447 		    "HCA: %llx", sp->session_lgid.gid_guid);
448 		return (-1);
449 	}
450 
451 	hca_guid = hcap->hca_guid;
452 
453 	/* allocate and initialize the ctrl channel */
454 	ret = rds_ep_init(&sp->session_ctrlep, hca_guid);
455 	if (ret != 0) {
456 		RDS_DPRINTF2(LABEL, "SP(%p): Ctrl EP(%p) initialization "
457 		    "failed", sp, &sp->session_ctrlep);
458 		return (-1);
459 	}
460 
461 	RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p)", sp, &sp->session_ctrlep);
462 
463 	/* allocate and initialize the data channel */
464 	ret = rds_ep_init(&sp->session_dataep, hca_guid);
465 	if (ret != 0) {
466 		RDS_DPRINTF2(LABEL, "SP(%p): Data EP(%p) initialization "
467 		    "failed", sp, &sp->session_dataep);
468 		rds_ep_fini(&sp->session_ctrlep);
469 		return (-1);
470 	}
471 
472 	/* Clear the portmaps */
473 	rds_unmark_all_ports(sp, RDS_LOCAL);
474 	rds_unmark_all_ports(sp, RDS_REMOTE);
475 
476 	RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p)", sp, &sp->session_dataep);
477 
478 	RDS_DPRINTF2("rds_session_init", "Return");
479 
480 	return (0);
481 }
482 
483 /*
484  * This should be called before moving a session from ERROR state to
485  * INIT state. This will update the HCA keys incase the session has moved from
486  * one HCA to another.
487  */
488 int
489 rds_session_reinit(rds_session_t *sp, ib_gid_t lgid)
490 {
491 	rds_hca_t	*hcap, *hcap1;
492 	int		ret;
493 
494 	RDS_DPRINTF2("rds_session_reinit", "Enter: SP(0x%p)", sp);
495 
496 	/* CALLED WITH SESSION WRITE LOCK */
497 
498 	hcap = rds_gid_to_hcap(rdsib_statep, lgid);
499 	if (hcap == NULL) {
500 		RDS_DPRINTF1("rds_session_reinit", "SGID is on an "
501 		    "uninitialized HCA: %llx", lgid.gid_guid);
502 		return (-1);
503 	}
504 
505 	hcap1 = rds_gid_to_hcap(rdsib_statep, sp->session_lgid);
506 	if (hcap1 == NULL) {
507 		RDS_DPRINTF1("rds_session_reinit", "Seems like HCA %llx "
508 		    "is unplugged", sp->session_lgid.gid_guid);
509 	} else if (hcap->hca_guid == hcap1->hca_guid) {
510 		/*
511 		 * No action is needed as the session did not move across
512 		 * HCAs
513 		 */
514 		RDS_DPRINTF2("rds_session_reinit", "Failover on the same HCA");
515 		return (0);
516 	}
517 
518 	RDS_DPRINTF2("rds_session_reinit", "Failover across HCAs");
519 
520 	/* re-initialize the control channel */
521 	ret = rds_ep_reinit(&sp->session_ctrlep, hcap->hca_guid);
522 	if (ret != 0) {
523 		RDS_DPRINTF2("rds_session_reinit",
524 		    "SP(%p): Ctrl EP(%p) re-initialization failed",
525 		    sp, &sp->session_ctrlep);
526 		return (-1);
527 	}
528 
529 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Control EP(%p)",
530 	    sp, &sp->session_ctrlep);
531 
532 	/* re-initialize the data channel */
533 	ret = rds_ep_reinit(&sp->session_dataep, hcap->hca_guid);
534 	if (ret != 0) {
535 		RDS_DPRINTF2("rds_session_reinit",
536 		    "SP(%p): Data EP(%p) re-initialization failed",
537 		    sp, &sp->session_dataep);
538 		return (-1);
539 	}
540 
541 	RDS_DPRINTF2("rds_session_reinit", "SP(%p) Data EP(%p)",
542 	    sp, &sp->session_dataep);
543 
544 	sp->session_lgid = lgid;
545 
546 	/* Clear the portmaps */
547 	rds_unmark_all_ports(sp, RDS_LOCAL);
548 	rds_unmark_all_ports(sp, RDS_REMOTE);
549 
550 	RDS_DPRINTF2("rds_session_reinit", "Return: SP(0x%p)", sp);
551 
552 	return (0);
553 }
554 
555 static int
556 rds_session_connect(rds_session_t *sp)
557 {
558 	ibt_channel_hdl_t	ctrlchan, datachan;
559 	rds_ep_t		*ep;
560 	int			ret;
561 
562 	RDS_DPRINTF2("rds_session_connect", "Enter SP(%p)", sp);
563 
564 	sp->session_pinfo.pi_sid = rdsib_statep->rds_service_id;
565 
566 	/* Override the packet life time based on the conf file */
567 	if (IBPktLifeTime != 0) {
568 		sp->session_pinfo.pi_prim_cep_path.cep_cm_opaque1 =
569 		    IBPktLifeTime;
570 	}
571 
572 	/* Session type may change if we run into peer-to-peer case. */
573 	rw_enter(&sp->session_lock, RW_READER);
574 	if (sp->session_type == RDS_SESSION_PASSIVE) {
575 		RDS_DPRINTF2("rds_session_connect", "SP(%p) is no longer the "
576 		    "active end", sp);
577 		rw_exit(&sp->session_lock);
578 		return (0); /* return success */
579 	}
580 	rw_exit(&sp->session_lock);
581 
582 	/* connect the data ep first */
583 	ep = &sp->session_dataep;
584 	mutex_enter(&ep->ep_lock);
585 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
586 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
587 		mutex_exit(&ep->ep_lock);
588 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
589 		    &datachan);
590 		if (ret != IBT_SUCCESS) {
591 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
592 			    "failed: %d", ep, ret);
593 			return (-1);
594 		}
595 		sp->session_dataep.ep_chanhdl = datachan;
596 	} else {
597 		RDS_DPRINTF2(LABEL, "SP(%p) Data EP(%p) is in "
598 		    "unexpected state: %d", sp, ep, ep->ep_state);
599 		mutex_exit(&ep->ep_lock);
600 		return (-1);
601 	}
602 
603 	RDS_DPRINTF3(LABEL, "SP(%p) EP(%p): Data channel is connected",
604 	    sp, ep);
605 
606 	ep = &sp->session_ctrlep;
607 	mutex_enter(&ep->ep_lock);
608 	if (ep->ep_state == RDS_EP_STATE_UNCONNECTED) {
609 		ep->ep_state = RDS_EP_STATE_ACTIVE_PENDING;
610 		mutex_exit(&ep->ep_lock);
611 		ret = rds_open_rc_channel(ep, &sp->session_pinfo, IBT_BLOCKING,
612 		    &ctrlchan);
613 		if (ret != IBT_SUCCESS) {
614 			RDS_DPRINTF2(LABEL, "EP(%p): rds_open_rc_channel "
615 			    "failed: %d", ep, ret);
616 			return (-1);
617 		}
618 		sp->session_ctrlep.ep_chanhdl = ctrlchan;
619 	} else {
620 		RDS_DPRINTF2(LABEL, "SP(%p) Control EP(%p) is in "
621 		    "unexpected state: %d", sp, ep, ep->ep_state);
622 		mutex_exit(&ep->ep_lock);
623 		return (-1);
624 	}
625 
626 	RDS_DPRINTF2(LABEL, "Session (%p) 0x%x <--> 0x%x is CONNECTED",
627 	    sp, sp->session_myip, sp->session_remip);
628 
629 	RDS_DPRINTF2("rds_session_connect", "Return SP(%p)", sp);
630 
631 	return (0);
632 }
633 
634 /*
635  * Can be called with or without session_lock.
636  */
637 void
638 rds_session_close(rds_session_t *sp, ibt_execution_mode_t mode, uint_t wait)
639 {
640 	rds_ep_t		*ep;
641 
642 	RDS_DPRINTF2("rds_session_close", "SP(%p) State: %d", sp,
643 	    sp->session_state);
644 
645 	ep = &sp->session_dataep;
646 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
647 
648 	/* wait until the SQ is empty before closing */
649 	(void) rds_is_sendq_empty(ep, wait);
650 
651 	mutex_enter(&ep->ep_lock);
652 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
653 		mutex_exit(&ep->ep_lock);
654 		delay(drv_usectohz(300000));
655 		mutex_enter(&ep->ep_lock);
656 	}
657 
658 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
659 		ep->ep_state = RDS_EP_STATE_CLOSING;
660 		mutex_exit(&ep->ep_lock);
661 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
662 		mutex_enter(&ep->ep_lock);
663 	}
664 	rds_ep_free_rc_channel(ep);
665 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
666 	ep->ep_segfbp = NULL;
667 	ep->ep_seglbp = NULL;
668 	mutex_exit(&ep->ep_lock);
669 
670 	ep = &sp->session_ctrlep;
671 	RDS_DPRINTF3(LABEL, "EP(%p) State: %d", ep, ep->ep_state);
672 
673 	/* wait until the SQ is empty before closing */
674 	(void) rds_is_sendq_empty(ep, 1);
675 
676 	mutex_enter(&ep->ep_lock);
677 	while (ep->ep_state == RDS_EP_STATE_CLOSING) {
678 		mutex_exit(&ep->ep_lock);
679 		delay(drv_usectohz(300000));
680 		mutex_enter(&ep->ep_lock);
681 	}
682 
683 	if (ep->ep_state == RDS_EP_STATE_CONNECTED) {
684 		ep->ep_state = RDS_EP_STATE_CLOSING;
685 		mutex_exit(&ep->ep_lock);
686 		(void) rds_close_rc_channel(ep->ep_chanhdl, mode);
687 		mutex_enter(&ep->ep_lock);
688 	}
689 	rds_ep_free_rc_channel(ep);
690 	ep->ep_state = RDS_EP_STATE_UNCONNECTED;
691 	ep->ep_segfbp = NULL;
692 	ep->ep_seglbp = NULL;
693 	mutex_exit(&ep->ep_lock);
694 
695 	RDS_DPRINTF2("rds_session_close", "Return (%p)", sp);
696 }
697 
698 /* Free the session */
699 static void
700 rds_destroy_session(rds_session_t *sp)
701 {
702 	rds_ep_t	*ep;
703 	rds_bufpool_t	*pool;
704 
705 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
706 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
707 	    (sp->session_state == RDS_SESSION_STATE_FINI) ||
708 	    (sp->session_state == RDS_SESSION_STATE_PASSIVE_CLOSING));
709 
710 	rw_enter(&sp->session_lock, RW_READER);
711 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d", sp,
712 	    sp->session_state);
713 	while (!((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
714 	    (sp->session_state == RDS_SESSION_STATE_FAILED) ||
715 	    (sp->session_state == RDS_SESSION_STATE_FINI))) {
716 		rw_exit(&sp->session_lock);
717 		delay(drv_usectohz(1000000));
718 		rw_enter(&sp->session_lock, RW_READER);
719 		RDS_DPRINTF2("rds_destroy_session", "SP(%p) State: %d WAITING "
720 		    "ON SESSION", sp, sp->session_state);
721 	}
722 	rw_exit(&sp->session_lock);
723 
724 	/* data channel */
725 	ep = &sp->session_dataep;
726 
727 	/* send pool locks */
728 	pool = &ep->ep_sndpool;
729 	cv_destroy(&pool->pool_cv);
730 	mutex_destroy(&pool->pool_lock);
731 
732 	/* recv pool locks */
733 	pool = &ep->ep_rcvpool;
734 	cv_destroy(&pool->pool_cv);
735 	mutex_destroy(&pool->pool_lock);
736 	mutex_destroy(&ep->ep_recvqp.qp_lock);
737 
738 	/* control channel */
739 	ep = &sp->session_ctrlep;
740 
741 	/* send pool locks */
742 	pool = &ep->ep_sndpool;
743 	cv_destroy(&pool->pool_cv);
744 	mutex_destroy(&pool->pool_lock);
745 
746 	/* recv pool locks */
747 	pool = &ep->ep_rcvpool;
748 	cv_destroy(&pool->pool_cv);
749 	mutex_destroy(&pool->pool_lock);
750 	mutex_destroy(&ep->ep_recvqp.qp_lock);
751 
752 	/* session */
753 	rw_destroy(&sp->session_lock);
754 	rw_destroy(&sp->session_local_portmap_lock);
755 	rw_destroy(&sp->session_remote_portmap_lock);
756 
757 	/* free the session */
758 	kmem_free(sp, sizeof (rds_session_t));
759 
760 	RDS_DPRINTF2("rds_destroy_session", "SP(%p) Return", sp);
761 }
762 
763 /* This is called on the taskq thread */
764 static void
765 rds_failover_session(void *arg)
766 {
767 	rds_session_t	*sp = (rds_session_t *)arg;
768 	ib_gid_t	lgid, rgid;
769 	ipaddr_t	myip, remip;
770 	int		ret, cnt = 0;
771 
772 	RDS_DPRINTF2("rds_failover_session", "Enter: (%p)", sp);
773 
774 	RDS_INCR_FAILOVERS();
775 
776 	rw_enter(&sp->session_lock, RW_WRITER);
777 	if (sp->session_type != RDS_SESSION_ACTIVE) {
778 		/*
779 		 * The remote side must have seen the error and initiated
780 		 * a re-connect.
781 		 */
782 		RDS_DPRINTF2("rds_failover_session",
783 		    "SP(%p) has become passive", sp);
784 		rw_exit(&sp->session_lock);
785 		return;
786 	}
787 	sp->session_failover = 1;
788 	rw_exit(&sp->session_lock);
789 
790 	/*
791 	 * The session is in ERROR state but close both channels
792 	 * for a clean start.
793 	 */
794 	rds_session_close(sp, IBT_BLOCKING, 1);
795 
796 	/* wait 1 sec before re-connecting */
797 	delay(drv_usectohz(1000000));
798 
799 	do {
800 		ibt_ip_path_attr_t	ipattr;
801 		ibt_ip_addr_t		dstip;
802 
803 		/* The ipaddr should be in the network order */
804 		myip = sp->session_myip;
805 		remip = sp->session_remip;
806 		ret = rds_sc_path_lookup(&myip, &remip);
807 		if (ret == 0) {
808 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
809 			    myip, remip);
810 		}
811 		/* check if we have (new) path from the source to destination */
812 		lgid.gid_prefix = 0;
813 		lgid.gid_guid = 0;
814 		rgid.gid_prefix = 0;
815 		rgid.gid_guid = 0;
816 
817 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
818 		dstip.family = AF_INET;
819 		dstip.un.ip4addr = htonl(remip);
820 		ipattr.ipa_dst_ip = &dstip;
821 		ipattr.ipa_src_ip.family = AF_INET;
822 		ipattr.ipa_src_ip.un.ip4addr = htonl(myip);
823 		ipattr.ipa_ndst = 1;
824 		ipattr.ipa_max_paths = 1;
825 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
826 		    myip, remip);
827 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
828 		    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo, NULL, NULL);
829 		if (ret == IBT_SUCCESS) {
830 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
831 			lgid = sp->session_pinfo.
832 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
833 			rgid = sp->session_pinfo.
834 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
835 			break;
836 		}
837 
838 		RDS_DPRINTF1(LABEL, "ibt_get_ip_paths failed, ret: %d ", ret);
839 
840 		/* wait 1 sec before re-trying */
841 		delay(drv_usectohz(1000000));
842 		cnt++;
843 	} while (cnt < 5);
844 
845 	if (ret != IBT_SUCCESS) {
846 		rw_enter(&sp->session_lock, RW_WRITER);
847 		if (sp->session_type == RDS_SESSION_ACTIVE) {
848 			rds_session_fini(sp);
849 			sp->session_state = RDS_SESSION_STATE_FAILED;
850 			sp->session_failover = 0;
851 			RDS_DPRINTF3("rds_failover_session",
852 			    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
853 		} else {
854 			RDS_DPRINTF2("rds_failover_session",
855 			    "SP(%p) has become passive", sp);
856 		}
857 		rw_exit(&sp->session_lock);
858 		return;
859 	}
860 
861 	RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
862 	    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
863 	    rgid.gid_guid);
864 
865 	rw_enter(&sp->session_lock, RW_WRITER);
866 	if (sp->session_type != RDS_SESSION_ACTIVE) {
867 		/*
868 		 * The remote side must have seen the error and initiated
869 		 * a re-connect.
870 		 */
871 		RDS_DPRINTF2("rds_failover_session",
872 		    "SP(%p) has become passive", sp);
873 		rw_exit(&sp->session_lock);
874 		return;
875 	}
876 
877 	/* move the session to init state */
878 	ret = rds_session_reinit(sp, lgid);
879 	sp->session_lgid = lgid;
880 	sp->session_rgid = rgid;
881 	if (ret != 0) {
882 		rds_session_fini(sp);
883 		sp->session_state = RDS_SESSION_STATE_FAILED;
884 		sp->session_failover = 0;
885 		RDS_DPRINTF3("rds_failover_session",
886 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
887 		rw_exit(&sp->session_lock);
888 		return;
889 	} else {
890 		sp->session_state = RDS_SESSION_STATE_INIT;
891 		RDS_DPRINTF3("rds_failover_session",
892 		    "SP(%p) State RDS_SESSION_STATE_INIT", sp);
893 	}
894 	rw_exit(&sp->session_lock);
895 
896 	rds_session_open(sp);
897 
898 	RDS_DPRINTF2("rds_failover_session", "Return: (%p)", sp);
899 }
900 
901 void
902 rds_handle_send_error(rds_ep_t *ep)
903 {
904 	if (rds_is_sendq_empty(ep, 0)) {
905 		/* Session should already be in ERROR, try to reconnect */
906 		RDS_DPRINTF2("rds_handle_send_error",
907 		    "Dispatching taskq to failover SP(%p)", ep->ep_sp);
908 		(void) ddi_taskq_dispatch(rds_taskq, rds_failover_session,
909 		    (void *)ep->ep_sp, DDI_SLEEP);
910 	}
911 }
912 
913 /*
914  * Called in the CM handler on the passive side
915  * Called on a taskq thread.
916  */
917 void
918 rds_cleanup_passive_session(void *arg)
919 {
920 	rds_session_t	*sp = arg;
921 
922 	RDS_DPRINTF2("rds_cleanup_passive_session", "SP(%p) State: %d", sp,
923 	    sp->session_state);
924 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
925 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
926 
927 	rds_session_close(sp, IBT_BLOCKING, 1);
928 
929 	rw_enter(&sp->session_lock, RW_WRITER);
930 	if (sp->session_state == RDS_SESSION_STATE_CLOSED) {
931 		rds_session_fini(sp);
932 		sp->session_state = RDS_SESSION_STATE_FINI;
933 		sp->session_failover = 0;
934 		RDS_DPRINTF3("rds_cleanup_passive_session",
935 		    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
936 	} else if (sp->session_state == RDS_SESSION_STATE_ERROR) {
937 		rds_session_fini(sp);
938 		sp->session_state = RDS_SESSION_STATE_FAILED;
939 		sp->session_failover = 0;
940 		RDS_DPRINTF3("rds_cleanup_passive_session",
941 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
942 	}
943 	rw_exit(&sp->session_lock);
944 
945 	RDS_DPRINTF2("rds_cleanup_passive_session", "Return: SP (%p)", sp);
946 }
947 
948 /*
949  * Called by the CM handler on the passive side
950  * Called with WRITE lock on the session
951  */
952 void
953 rds_passive_session_fini(rds_session_t *sp)
954 {
955 	rds_ep_t	*ep;
956 
957 	RDS_DPRINTF2("rds_passive_session_fini", "SP(%p) State: %d", sp,
958 	    sp->session_state);
959 	ASSERT((sp->session_state == RDS_SESSION_STATE_CLOSED) ||
960 	    (sp->session_state == RDS_SESSION_STATE_ERROR));
961 
962 	/* clean the data channel */
963 	ep = &sp->session_dataep;
964 	(void) rds_is_sendq_empty(ep, 1);
965 	mutex_enter(&ep->ep_lock);
966 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
967 	    ep->ep_state);
968 	rds_ep_free_rc_channel(ep);
969 	mutex_exit(&ep->ep_lock);
970 
971 	/* clean the control channel */
972 	ep = &sp->session_ctrlep;
973 	(void) rds_is_sendq_empty(ep, 1);
974 	mutex_enter(&ep->ep_lock);
975 	RDS_DPRINTF2("rds_passive_session_fini", "EP(%p) State: %d", ep,
976 	    ep->ep_state);
977 	rds_ep_free_rc_channel(ep);
978 	mutex_exit(&ep->ep_lock);
979 
980 	rds_session_fini(sp);
981 	sp->session_failover = 0;
982 
983 	RDS_DPRINTF2("rds_passive_session_fini", "Return: SP (%p)", sp);
984 }
985 
986 /*
987  * Can be called:
988  * 1. on driver detach
989  * 2. on taskq thread
990  * arg is always NULL
991  */
992 /* ARGSUSED */
993 void
994 rds_close_sessions(void *arg)
995 {
996 	rds_session_t *sp, *spnextp;
997 
998 	RDS_DPRINTF2("rds_close_sessions", "Enter");
999 
1000 	/* wait until all the buffers are freed by the sockets */
1001 	while (RDS_GET_RXPKTS_PEND() != 0) {
1002 		/* wait one second and try again */
1003 		RDS_DPRINTF2("rds_close_sessions", "waiting on "
1004 		    "pending packets", RDS_GET_RXPKTS_PEND());
1005 		delay(drv_usectohz(1000000));
1006 	}
1007 	RDS_DPRINTF2("rds_close_sessions", "No more RX packets pending");
1008 
1009 	/* close all the sessions */
1010 	rw_enter(&rdsib_statep->rds_sessionlock, RW_WRITER);
1011 	sp = rdsib_statep->rds_sessionlistp;
1012 	while (sp) {
1013 		rw_enter(&sp->session_lock, RW_WRITER);
1014 		RDS_DPRINTF2("rds_close_sessions", "SP(%p) State: %d", sp,
1015 		    sp->session_state);
1016 
1017 		switch (sp->session_state) {
1018 		case RDS_SESSION_STATE_CONNECTED:
1019 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1020 			rw_exit(&sp->session_lock);
1021 
1022 			rds_session_close(sp, IBT_BLOCKING, 2);
1023 
1024 			rw_enter(&sp->session_lock, RW_WRITER);
1025 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1026 			RDS_DPRINTF3("rds_close_sessions",
1027 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1028 			rds_session_fini(sp);
1029 			sp->session_state = RDS_SESSION_STATE_FINI;
1030 			sp->session_failover = 0;
1031 			RDS_DPRINTF3("rds_close_sessions",
1032 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1033 			break;
1034 
1035 		case RDS_SESSION_STATE_ERROR:
1036 		case RDS_SESSION_STATE_PASSIVE_CLOSING:
1037 		case RDS_SESSION_STATE_INIT:
1038 			sp->session_state = RDS_SESSION_STATE_ACTIVE_CLOSING;
1039 			rw_exit(&sp->session_lock);
1040 
1041 			rds_session_close(sp, IBT_BLOCKING, 1);
1042 
1043 			rw_enter(&sp->session_lock, RW_WRITER);
1044 			sp->session_state = RDS_SESSION_STATE_CLOSED;
1045 			RDS_DPRINTF3("rds_close_sessions",
1046 			    "SP(%p) State RDS_SESSION_STATE_CLOSED", sp);
1047 			/* FALLTHRU */
1048 		case RDS_SESSION_STATE_CLOSED:
1049 			rds_session_fini(sp);
1050 			sp->session_state = RDS_SESSION_STATE_FINI;
1051 			sp->session_failover = 0;
1052 			RDS_DPRINTF3("rds_close_sessions",
1053 			    "SP(%p) State RDS_SESSION_STATE_FINI", sp);
1054 			break;
1055 		}
1056 
1057 		rw_exit(&sp->session_lock);
1058 		sp = sp->session_nextp;
1059 	}
1060 
1061 	sp = rdsib_statep->rds_sessionlistp;
1062 	rdsib_statep->rds_sessionlistp = NULL;
1063 	rdsib_statep->rds_nsessions = 0;
1064 	rw_exit(&rdsib_statep->rds_sessionlock);
1065 
1066 	while (sp) {
1067 		spnextp = sp->session_nextp;
1068 		rds_destroy_session(sp);
1069 		RDS_DECR_SESS();
1070 		sp = spnextp;
1071 	}
1072 
1073 	/* free the global pool */
1074 	rds_free_recv_caches(rdsib_statep);
1075 
1076 	RDS_DPRINTF2("rds_close_sessions", "Return");
1077 }
1078 
1079 void
1080 rds_session_open(rds_session_t *sp)
1081 {
1082 	int		ret;
1083 
1084 	RDS_DPRINTF2("rds_session_open", "Enter SP(%p)", sp);
1085 
1086 	ret = rds_session_connect(sp);
1087 	if (ret == -1) {
1088 		/*
1089 		 * may be the session has become passive due to
1090 		 * hitting peer-to-peer case
1091 		 */
1092 		rw_enter(&sp->session_lock, RW_READER);
1093 		if (sp->session_type == RDS_SESSION_PASSIVE) {
1094 			RDS_DPRINTF2("rds_session_open", "SP(%p) "
1095 			    "has become passive from active", sp);
1096 			rw_exit(&sp->session_lock);
1097 			return;
1098 		}
1099 
1100 		/* get the lock for writing */
1101 		rw_exit(&sp->session_lock);
1102 		rw_enter(&sp->session_lock, RW_WRITER);
1103 		sp->session_state = RDS_SESSION_STATE_ERROR;
1104 		RDS_DPRINTF3("rds_session_open",
1105 		    "SP(%p) State RDS_SESSION_STATE_ERROR", sp);
1106 		rw_exit(&sp->session_lock);
1107 
1108 		/* Connect request failed */
1109 		rds_session_close(sp, IBT_BLOCKING, 1);
1110 
1111 		rw_enter(&sp->session_lock, RW_WRITER);
1112 		rds_session_fini(sp);
1113 		sp->session_state = RDS_SESSION_STATE_FAILED;
1114 		sp->session_failover = 0;
1115 		RDS_DPRINTF3("rds_session_open",
1116 		    "SP(%p) State RDS_SESSION_STATE_FAILED", sp);
1117 		rw_exit(&sp->session_lock);
1118 
1119 		return;
1120 	}
1121 
1122 	RDS_DPRINTF2("rds_session_open", "Return: SP(%p)", sp);
1123 }
1124 
1125 /*
1126  * Creates a session and inserts it into the list of sessions. The session
1127  * state would be CREATED.
1128  * Return Values:
1129  *	EWOULDBLOCK
1130  */
1131 rds_session_t *
1132 rds_session_create(rds_state_t *statep, ipaddr_t localip, ipaddr_t remip,
1133     ibt_cm_req_rcv_t *reqp, uint8_t type)
1134 {
1135 	ib_gid_t	lgid, rgid;
1136 	rds_session_t	*newp, *oldp;
1137 	rds_ep_t	*dataep, *ctrlep;
1138 	rds_bufpool_t	*pool;
1139 	int		ret;
1140 
1141 	RDS_DPRINTF2("rds_session_create", "Enter: 0x%p 0x%x 0x%x",
1142 	    statep, localip, remip);
1143 
1144 	/* Allocate and initialize global buffer pool */
1145 	ret = rds_init_recv_caches(statep);
1146 	if (ret != 0) {
1147 		RDS_DPRINTF2(LABEL, "Buffer Cache Initialization failed");
1148 		return (NULL);
1149 	}
1150 
1151 	/* enough memory for session (includes 2 endpoints) */
1152 	newp = kmem_zalloc(sizeof (rds_session_t), KM_SLEEP);
1153 
1154 	newp->session_remip = remip;
1155 	newp->session_myip = localip;
1156 	newp->session_type = type;
1157 	newp->session_state = RDS_SESSION_STATE_CREATED;
1158 	RDS_DPRINTF3("rds_session_create",
1159 	    "SP(%p) State RDS_SESSION_STATE_CREATED", newp);
1160 	rw_init(&newp->session_lock, NULL, RW_DRIVER, NULL);
1161 	rw_init(&newp->session_local_portmap_lock, NULL, RW_DRIVER, NULL);
1162 	rw_init(&newp->session_remote_portmap_lock, NULL, RW_DRIVER, NULL);
1163 
1164 	/* Initialize data endpoint */
1165 	dataep = &newp->session_dataep;
1166 	dataep->ep_remip = newp->session_remip;
1167 	dataep->ep_myip = newp->session_myip;
1168 	dataep->ep_state = RDS_EP_STATE_UNCONNECTED;
1169 	dataep->ep_sp = newp;
1170 	dataep->ep_type = RDS_EP_TYPE_DATA;
1171 	mutex_init(&dataep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1172 
1173 	/* Initialize send pool locks */
1174 	pool = &dataep->ep_sndpool;
1175 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1176 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1177 
1178 	/* Initialize recv pool locks */
1179 	pool = &dataep->ep_rcvpool;
1180 	mutex_init(&dataep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1181 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1182 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1183 
1184 	/* Initialize control endpoint */
1185 	ctrlep = &newp->session_ctrlep;
1186 	ctrlep->ep_remip = newp->session_remip;
1187 	ctrlep->ep_myip = newp->session_myip;
1188 	ctrlep->ep_state = RDS_EP_STATE_UNCONNECTED;
1189 	ctrlep->ep_sp = newp;
1190 	ctrlep->ep_type = RDS_EP_TYPE_CTRL;
1191 	mutex_init(&ctrlep->ep_lock, NULL, MUTEX_DRIVER, NULL);
1192 
1193 	/* Initialize send pool locks */
1194 	pool = &ctrlep->ep_sndpool;
1195 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1196 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1197 
1198 	/* Initialize recv pool locks */
1199 	pool = &ctrlep->ep_rcvpool;
1200 	mutex_init(&ctrlep->ep_recvqp.qp_lock, NULL, MUTEX_DRIVER, NULL);
1201 	mutex_init(&pool->pool_lock, NULL, MUTEX_DRIVER, NULL);
1202 	cv_init(&pool->pool_cv, NULL, CV_DRIVER, NULL);
1203 
1204 	/* lkup if there is already a session */
1205 	rw_enter(&statep->rds_sessionlock, RW_WRITER);
1206 	oldp = rds_session_lkup(statep, remip, 0);
1207 	if (oldp != NULL) {
1208 		/* A session to this destination exists */
1209 		rw_exit(&statep->rds_sessionlock);
1210 		rw_destroy(&newp->session_lock);
1211 		rw_destroy(&newp->session_local_portmap_lock);
1212 		rw_destroy(&newp->session_remote_portmap_lock);
1213 		mutex_destroy(&dataep->ep_lock);
1214 		mutex_destroy(&ctrlep->ep_lock);
1215 		kmem_free(newp, sizeof (rds_session_t));
1216 		return (NULL);
1217 	}
1218 
1219 	/* Insert this session into the list */
1220 	rds_add_session(newp, B_TRUE);
1221 
1222 	/* unlock the session list */
1223 	rw_exit(&statep->rds_sessionlock);
1224 
1225 	if (type == RDS_SESSION_ACTIVE) {
1226 		ipaddr_t localip1, remip1;
1227 		ibt_ip_path_attr_t	ipattr;
1228 		ibt_ip_addr_t		dstip;
1229 
1230 		/* The ipaddr should be in the network order */
1231 		localip1 = localip;
1232 		remip1 = remip;
1233 		ret = rds_sc_path_lookup(&localip1, &remip1);
1234 		if (ret == 0) {
1235 			RDS_DPRINTF2(LABEL, "Path not found (0x%x 0x%x)",
1236 			    localip, remip);
1237 		}
1238 
1239 		/* Get the gids for the source and destination ip addrs */
1240 		lgid.gid_prefix = 0;
1241 		lgid.gid_guid = 0;
1242 		rgid.gid_prefix = 0;
1243 		rgid.gid_guid = 0;
1244 
1245 		bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1246 		dstip.family = AF_INET;
1247 		dstip.un.ip4addr = ntohl(remip1);
1248 		ipattr.ipa_dst_ip = &dstip;
1249 		ipattr.ipa_src_ip.family = AF_INET;
1250 		ipattr.ipa_src_ip.un.ip4addr = ntohl(localip1);
1251 		ipattr.ipa_ndst = 1;
1252 		ipattr.ipa_max_paths = 1;
1253 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
1254 		    localip1, remip1);
1255 		ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
1256 		    IBT_PATH_NO_FLAGS, &ipattr, &newp->session_pinfo,
1257 		    NULL, NULL);
1258 		if (ret != IBT_SUCCESS) {
1259 			RDS_DPRINTF1(LABEL, "ibt_get_ip_paths failed, ret: %d "
1260 			    "lgid: %llx:%llx rgid: %llx:%llx", lgid.gid_prefix,
1261 			    lgid.gid_guid, rgid.gid_prefix, rgid.gid_guid);
1262 
1263 			RDS_SESSION_TRANSITION(newp, RDS_SESSION_STATE_FAILED);
1264 			return (NULL);
1265 		}
1266 		RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
1267 		lgid =
1268 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_sgid;
1269 		rgid =
1270 		    newp->session_pinfo.pi_prim_cep_path.cep_adds_vect.av_dgid;
1271 
1272 		RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
1273 		    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
1274 		    rgid.gid_guid);
1275 	}
1276 
1277 	rw_enter(&newp->session_lock, RW_WRITER);
1278 	/* check for peer-to-peer case */
1279 	if (type == newp->session_type) {
1280 		/* no peer-to-peer case */
1281 		if (type == RDS_SESSION_ACTIVE) {
1282 			newp->session_lgid = lgid;
1283 			newp->session_rgid = rgid;
1284 		} else {
1285 			/* rgid is requester gid & lgid is receiver gid */
1286 			newp->session_rgid = reqp->req_prim_addr.av_dgid;
1287 			newp->session_lgid = reqp->req_prim_addr.av_sgid;
1288 		}
1289 	}
1290 	rw_exit(&newp->session_lock);
1291 
1292 	RDS_DPRINTF2("rds_session_create", "Return SP(%p)", newp);
1293 
1294 	return (newp);
1295 }
1296 
1297 void
1298 rds_handle_control_message(rds_session_t *sp, rds_ctrl_pkt_t *cpkt)
1299 {
1300 	RDS_DPRINTF4("rds_handle_control_message", "Enter: SP(%p) code: %d "
1301 	    "port: %d", sp, cpkt->rcp_code, cpkt->rcp_port);
1302 
1303 	switch (cpkt->rcp_code) {
1304 	case RDS_CTRL_CODE_STALL:
1305 		RDS_INCR_STALLS_RCVD();
1306 		(void) rds_check_n_mark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1307 		break;
1308 	case RDS_CTRL_CODE_UNSTALL:
1309 		RDS_INCR_UNSTALLS_RCVD();
1310 		(void) rds_check_n_unmark_port(sp, cpkt->rcp_port, RDS_REMOTE);
1311 		break;
1312 	case RDS_CTRL_CODE_STALL_PORTS:
1313 		rds_mark_all_ports(sp, RDS_REMOTE);
1314 		break;
1315 	case RDS_CTRL_CODE_UNSTALL_PORTS:
1316 		rds_unmark_all_ports(sp, RDS_REMOTE);
1317 		break;
1318 	case RDS_CTRL_CODE_HEARTBEAT:
1319 		break;
1320 	default:
1321 		RDS_DPRINTF2(LABEL, "ERROR: Invalid Control code: %d",
1322 		    cpkt->rcp_code);
1323 		break;
1324 	}
1325 
1326 	RDS_DPRINTF4("rds_handle_control_message", "Return");
1327 }
1328 
1329 int
1330 rds_post_control_message(rds_session_t *sp, uint8_t code, in_port_t port)
1331 {
1332 	ibt_send_wr_t	wr;
1333 	rds_ep_t	*ep;
1334 	rds_buf_t	*bp;
1335 	rds_ctrl_pkt_t	*cp;
1336 	int		ret;
1337 
1338 	RDS_DPRINTF4("rds_post_control_message", "Enter: SP(%p) Code: %d "
1339 	    "Port: %d", sp, code, port);
1340 
1341 	ep = &sp->session_ctrlep;
1342 
1343 	bp = rds_get_send_buf(ep, 1);
1344 	if (bp == NULL) {
1345 		RDS_DPRINTF2(LABEL, "No buffers available to send control "
1346 		    "message: SP(%p) Code: %d Port: %d", sp, code,
1347 		    port);
1348 		return (-1);
1349 	}
1350 
1351 	cp = (rds_ctrl_pkt_t *)(uintptr_t)bp->buf_ds.ds_va;
1352 	cp->rcp_code = code;
1353 	cp->rcp_port = port;
1354 	bp->buf_ds.ds_len = RDS_CTRLPKT_SIZE;
1355 
1356 	wr.wr_id = (uintptr_t)bp;
1357 	wr.wr_flags = IBT_WR_SEND_SOLICIT;
1358 	wr.wr_trans = IBT_RC_SRV;
1359 	wr.wr_opcode = IBT_WRC_SEND;
1360 	wr.wr_nds = 1;
1361 	wr.wr_sgl = &bp->buf_ds;
1362 	RDS_DPRINTF5(LABEL, "ds_va %p ds_len %d ds_lkey 0x%llx",
1363 	    bp->buf_ds.ds_va, bp->buf_ds.ds_len, bp->buf_ds.ds_key);
1364 	ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1365 	if (ret != IBT_SUCCESS) {
1366 		RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1367 		    "%d", ep, ret);
1368 		bp->buf_state = RDS_SNDBUF_FREE;
1369 		rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1370 		return (-1);
1371 	}
1372 
1373 	RDS_DPRINTF4("rds_post_control_message", "Return SP(%p) Code: %d "
1374 	    "Port: %d", sp, code, port);
1375 
1376 	return (0);
1377 }
1378 
1379 void
1380 rds_stall_port(rds_session_t *sp, in_port_t port, uint_t qualifier)
1381 {
1382 	int		ret;
1383 
1384 	RDS_DPRINTF4("rds_stall_port", "Enter: SP(%p) Port %d", sp, port);
1385 
1386 	RDS_INCR_STALLS_TRIGGERED();
1387 
1388 	if (!rds_check_n_mark_port(sp, port, qualifier)) {
1389 
1390 		if (sp != NULL) {
1391 			ret = rds_post_control_message(sp,
1392 			    RDS_CTRL_CODE_STALL, port);
1393 			if (ret != 0) {
1394 				(void) rds_check_n_unmark_port(sp, port,
1395 				    qualifier);
1396 				return;
1397 			}
1398 			RDS_INCR_STALLS_SENT();
1399 		}
1400 	} else {
1401 		RDS_DPRINTF3(LABEL,
1402 		    "Port %d is already in stall state", port);
1403 	}
1404 
1405 	RDS_DPRINTF4("rds_stall_port", "Return: SP(%p) Port %d", sp, port);
1406 }
1407 
1408 void
1409 rds_resume_port(in_port_t port)
1410 {
1411 	rds_session_t	*sp;
1412 	uint_t		ix;
1413 	int		ret;
1414 
1415 	RDS_DPRINTF4("rds_resume_port", "Enter: Port %d", port);
1416 
1417 	RDS_INCR_UNSTALLS_TRIGGERED();
1418 
1419 	/* resume loopback traffic */
1420 	(void) rds_check_n_unmark_port(NULL, port, RDS_LOOPBACK);
1421 
1422 	/* send unstall messages to resume the remote traffic */
1423 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1424 
1425 	sp = rdsib_statep->rds_sessionlistp;
1426 	for (ix = 0; ix < rdsib_statep->rds_nsessions; ix++) {
1427 		ASSERT(sp != NULL);
1428 		if ((sp->session_state == RDS_SESSION_STATE_CONNECTED) &&
1429 		    (rds_check_n_unmark_port(sp, port, RDS_LOCAL))) {
1430 				ret = rds_post_control_message(sp,
1431 				    RDS_CTRL_CODE_UNSTALL, port);
1432 				if (ret != 0) {
1433 					(void) rds_check_n_mark_port(sp, port,
1434 					    RDS_LOCAL);
1435 				} else {
1436 					RDS_INCR_UNSTALLS_SENT();
1437 				}
1438 		}
1439 
1440 		sp = sp->session_nextp;
1441 	}
1442 
1443 	rw_exit(&rdsib_statep->rds_sessionlock);
1444 
1445 	RDS_DPRINTF4("rds_resume_port", "Return: Port %d", port);
1446 }
1447 
1448 static int
1449 rds_build_n_post_msg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1450     in_port_t recvport)
1451 {
1452 	ibt_send_wr_t	*wrp, wr;
1453 	rds_buf_t	*bp, *bp1;
1454 	rds_data_hdr_t	*pktp;
1455 	uint32_t	msgsize, npkts, residual, pktno, ix;
1456 	int		ret;
1457 
1458 	RDS_DPRINTF4("rds_build_n_post_msg", "Enter: EP(%p) UIOP(%p)",
1459 	    ep, uiop);
1460 
1461 	/* how many pkts are needed to carry this msg */
1462 	msgsize = uiop->uio_resid;
1463 	npkts = ((msgsize - 1) / UserBufferSize) + 1;
1464 	residual = ((msgsize - 1) % UserBufferSize) + 1;
1465 
1466 	RDS_DPRINTF5(LABEL, "EP(%p) UIOP(%p) msg size: %d npkts: %d", ep, uiop,
1467 	    msgsize, npkts);
1468 
1469 	/* Get the buffers needed to post this message */
1470 	bp = rds_get_send_buf(ep, npkts);
1471 	if (bp == NULL) {
1472 		RDS_INCR_ENOBUFS();
1473 		return (ENOBUFS);
1474 	}
1475 
1476 	if (npkts > 1) {
1477 		/*
1478 		 * multi-pkt messages are posted at the same time as a list
1479 		 * of WRs
1480 		 */
1481 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) *
1482 		    npkts, KM_SLEEP);
1483 	}
1484 
1485 
1486 	pktno = 0;
1487 	bp1 = bp;
1488 	do {
1489 		/* prepare the header */
1490 		pktp = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
1491 		pktp->dh_datalen = UserBufferSize;
1492 		pktp->dh_npkts = npkts - pktno;
1493 		pktp->dh_psn = pktno;
1494 		pktp->dh_sendport = sendport;
1495 		pktp->dh_recvport = recvport;
1496 		bp1->buf_ds.ds_len = RdsPktSize;
1497 
1498 		/* copy the data */
1499 		ret = uiomove((uint8_t *)pktp + RDS_DATA_HDR_SZ,
1500 		    UserBufferSize, UIO_WRITE, uiop);
1501 		if (ret != 0) {
1502 			break;
1503 		}
1504 
1505 		if (uiop->uio_resid == 0) {
1506 			pktp->dh_datalen = residual;
1507 			bp1->buf_ds.ds_len = residual + RDS_DATA_HDR_SZ;
1508 			break;
1509 		}
1510 		pktno++;
1511 		bp1 = bp1->buf_nextp;
1512 	} while (uiop->uio_resid);
1513 
1514 	if (ret) {
1515 		/* uiomove failed */
1516 		RDS_DPRINTF2("rds_build_n_post_msg", "UIO(%p) Move FAILED: %d",
1517 		    uiop, ret);
1518 		if (npkts > 1) {
1519 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1520 		}
1521 		rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1522 		return (ret);
1523 	}
1524 
1525 	if (npkts > 1) {
1526 		/* multi-pkt message */
1527 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Multiple Packets", ep);
1528 
1529 		bp1 = bp;
1530 		for (ix = 0; ix < npkts; ix++) {
1531 			wrp[ix].wr_id = (uintptr_t)bp1;
1532 			wrp[ix].wr_flags = IBT_WR_NO_FLAGS;
1533 			wrp[ix].wr_trans = IBT_RC_SRV;
1534 			wrp[ix].wr_opcode = IBT_WRC_SEND;
1535 			wrp[ix].wr_nds = 1;
1536 			wrp[ix].wr_sgl = &bp1->buf_ds;
1537 			bp1 = bp1->buf_nextp;
1538 		}
1539 		wrp[npkts - 1].wr_flags = IBT_WR_SEND_SOLICIT;
1540 
1541 		ret = ibt_post_send(ep->ep_chanhdl, wrp, npkts, &ix);
1542 		if (ret != IBT_SUCCESS) {
1543 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1544 			    "%d for %d pkts", ep, ret, npkts);
1545 			rds_free_send_buf(ep, bp, NULL, npkts, B_FALSE);
1546 			kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1547 			return (ret);
1548 		}
1549 
1550 		kmem_free(wrp, npkts * sizeof (ibt_send_wr_t));
1551 	} else {
1552 		/* single pkt */
1553 		RDS_DPRINTF5(LABEL, "EP(%p) Sending Single Packet", ep);
1554 		wr.wr_id = (uintptr_t)bp;
1555 		wr.wr_flags = IBT_WR_SEND_SOLICIT;
1556 		wr.wr_trans = IBT_RC_SRV;
1557 		wr.wr_opcode = IBT_WRC_SEND;
1558 		wr.wr_nds = 1;
1559 		wr.wr_sgl = &bp->buf_ds;
1560 		RDS_DPRINTF5(LABEL, "ds_va %p ds_key 0x%llx ds_len %d ",
1561 		    bp->buf_ds.ds_va, bp->buf_ds.ds_key, bp->buf_ds.ds_len);
1562 		ret = ibt_post_send(ep->ep_chanhdl, &wr, 1, NULL);
1563 		if (ret != IBT_SUCCESS) {
1564 			RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send failed: "
1565 			    "%d", ep, ret);
1566 			rds_free_send_buf(ep, bp, NULL, 1, B_FALSE);
1567 			return (ret);
1568 		}
1569 	}
1570 
1571 	RDS_INCR_TXPKTS(npkts);
1572 	RDS_INCR_TXBYTES(msgsize);
1573 
1574 	RDS_DPRINTF4("rds_build_n_post_msg", "Return: EP(%p) UIOP(%p)",
1575 	    ep, uiop);
1576 
1577 	return (0);
1578 }
1579 
1580 static int
1581 rds_deliver_loopback_msg(uio_t *uiop, ipaddr_t recvip, ipaddr_t sendip,
1582     in_port_t recvport, in_port_t sendport, zoneid_t zoneid)
1583 {
1584 	mblk_t		*mp;
1585 	int		ret;
1586 
1587 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Enter");
1588 
1589 	RDS_DPRINTF3(LABEL, "Loopback message: sendport: "
1590 	    "%d to recvport: %d", sendport, recvport);
1591 
1592 	mp = allocb(uiop->uio_resid, BPRI_MED);
1593 	if (mp == NULL) {
1594 		RDS_DPRINTF2(LABEL, "allocb failed, size: %d\n",
1595 		    uiop->uio_resid);
1596 		return (ENOSPC);
1597 	}
1598 	mp->b_wptr = mp->b_rptr + uiop->uio_resid;
1599 
1600 	ret = uiomove(mp->b_rptr, uiop->uio_resid, UIO_WRITE, uiop);
1601 	if (ret) {
1602 		RDS_DPRINTF2(LABEL, "ERROR: uiomove returned: %d", ret);
1603 		freeb(mp);
1604 		return (ret);
1605 	}
1606 
1607 	ret = rds_deliver_new_msg(mp, recvip, sendip, recvport, sendport,
1608 	    zoneid);
1609 	if (ret != 0) {
1610 		if (ret == ENOSPC) {
1611 			/*
1612 			 * The message is delivered but cannot take more,
1613 			 * stop further loopback traffic to this port
1614 			 */
1615 			RDS_DPRINTF3("rds_deliver_loopback_msg",
1616 			    "Port %d NO SPACE", recvport);
1617 			rds_stall_port(NULL, recvport, RDS_LOOPBACK);
1618 		} else {
1619 			RDS_DPRINTF2(LABEL, "Loopback message: port %d -> "
1620 			    "port %d failed: %d", sendport, recvport, ret);
1621 			return (ret);
1622 		}
1623 	}
1624 
1625 	RDS_DPRINTF4("rds_deliver_loopback_msg", "Return");
1626 	return (0);
1627 }
1628 
1629 static void
1630 rds_resend_messages(void *arg)
1631 {
1632 	rds_session_t	*sp = (rds_session_t *)arg;
1633 	rds_ep_t	*ep;
1634 	rds_bufpool_t	*spool;
1635 	rds_buf_t	*bp, *endp, *tmp;
1636 	ibt_send_wr_t	*wrp;
1637 	uint_t		nwr = 0, ix, jx;
1638 	int		ret;
1639 
1640 	RDS_DPRINTF2("rds_resend_messages", "Enter: SP(%p)", sp);
1641 
1642 	ep = &sp->session_dataep;
1643 
1644 	spool = &ep->ep_sndpool;
1645 	mutex_enter(&spool->pool_lock);
1646 
1647 	ASSERT(spool->pool_nfree == spool->pool_nbuffers);
1648 
1649 	if (ep->ep_lbufid == NULL) {
1650 		RDS_DPRINTF2("rds_resend_messages",
1651 		    "SP(%p) Remote session is cleaned up ", sp);
1652 		/*
1653 		 * The remote end cleaned up its session. There may be loss
1654 		 * of messages. Mark all buffers as acknowledged.
1655 		 */
1656 		tmp = spool->pool_tailp;
1657 	} else {
1658 		tmp = (rds_buf_t *)ep->ep_lbufid;
1659 		RDS_DPRINTF2("rds_resend_messages",
1660 		    "SP(%p) Last successful BP(%p) ", sp, tmp);
1661 	}
1662 
1663 	endp = spool->pool_tailp;
1664 	bp = spool->pool_headp;
1665 	jx = 0;
1666 	while ((bp != NULL) && (bp != tmp)) {
1667 		bp->buf_state = RDS_SNDBUF_FREE;
1668 		jx++;
1669 		bp = bp->buf_nextp;
1670 	}
1671 
1672 	if (bp == NULL) {
1673 		mutex_exit(&spool->pool_lock);
1674 		RDS_DPRINTF2("rds_resend_messages", "Alert: lbufid(%p) is not "
1675 		    "found in the list", tmp);
1676 
1677 		rw_enter(&sp->session_lock, RW_WRITER);
1678 		if (sp->session_state == RDS_SESSION_STATE_INIT) {
1679 			sp->session_state = RDS_SESSION_STATE_CONNECTED;
1680 		} else {
1681 			RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1682 			    "Expected State: %d", sp, sp->session_state,
1683 			    RDS_SESSION_STATE_CONNECTED);
1684 		}
1685 		sp->session_failover = 0;
1686 		rw_exit(&sp->session_lock);
1687 		return;
1688 	}
1689 
1690 	/* Found the match */
1691 	bp->buf_state = RDS_SNDBUF_FREE;
1692 	jx++;
1693 
1694 	spool->pool_tailp = bp;
1695 	bp = bp->buf_nextp;
1696 	spool->pool_tailp->buf_nextp = NULL;
1697 	nwr = spool->pool_nfree - jx;
1698 	spool->pool_nfree = jx;
1699 	mutex_exit(&spool->pool_lock);
1700 
1701 	RDS_DPRINTF2("rds_resend_messages", "SP(%p): Number of "
1702 	    "bufs (BP %p) to re-send: %d", sp, bp, nwr);
1703 
1704 	if (bp) {
1705 		wrp = (ibt_send_wr_t *)kmem_zalloc(sizeof (ibt_send_wr_t) * 100,
1706 		    KM_SLEEP);
1707 
1708 		while (nwr) {
1709 			jx = (nwr > 100) ? 100 : nwr;
1710 
1711 			tmp = bp;
1712 			for (ix = 0; ix < jx; ix++) {
1713 				bp->buf_state = RDS_SNDBUF_PENDING;
1714 				wrp[ix].wr_id = (uintptr_t)bp;
1715 				wrp[ix].wr_flags = IBT_WR_SEND_SOLICIT;
1716 				wrp[ix].wr_trans = IBT_RC_SRV;
1717 				wrp[ix].wr_opcode = IBT_WRC_SEND;
1718 				wrp[ix].wr_nds = 1;
1719 				wrp[ix].wr_sgl = &bp->buf_ds;
1720 				bp = bp->buf_nextp;
1721 			}
1722 
1723 			ret = ibt_post_send(ep->ep_chanhdl, wrp, jx, &ix);
1724 			if (ret != IBT_SUCCESS) {
1725 				RDS_DPRINTF2(LABEL, "EP(%p): ibt_post_send "
1726 				    "failed: %d for % pkts", ep, ret, jx);
1727 				break;
1728 			}
1729 
1730 			mutex_enter(&spool->pool_lock);
1731 			spool->pool_nbusy += jx;
1732 			mutex_exit(&spool->pool_lock);
1733 
1734 			nwr -= jx;
1735 		}
1736 
1737 		kmem_free(wrp, sizeof (ibt_send_wr_t) * 100);
1738 
1739 		if (nwr != 0) {
1740 
1741 			/*
1742 			 * An error while failover is in progress. Some WRs are
1743 			 * posted while other remain. If any of the posted WRs
1744 			 * complete in error then they would dispatch a taskq to
1745 			 * do a failover. Getting the session lock will prevent
1746 			 * the taskq to wait until we are done here.
1747 			 */
1748 			rw_enter(&sp->session_lock, RW_READER);
1749 
1750 			/*
1751 			 * Wait until all the previous WRs are completed and
1752 			 * then queue the remaining, otherwise the order of
1753 			 * the messages may change.
1754 			 */
1755 			(void) rds_is_sendq_empty(ep, 1);
1756 
1757 			/* free the remaining buffers */
1758 			rds_free_send_buf(ep, tmp, endp, nwr, B_FALSE);
1759 
1760 			rw_exit(&sp->session_lock);
1761 			return;
1762 		}
1763 	}
1764 
1765 	rw_enter(&sp->session_lock, RW_WRITER);
1766 	if (sp->session_state == RDS_SESSION_STATE_INIT) {
1767 		sp->session_state = RDS_SESSION_STATE_CONNECTED;
1768 	} else {
1769 		RDS_DPRINTF2("rds_resend_messages", "SP(%p) State: %d "
1770 		    "Expected State: %d", sp, sp->session_state,
1771 		    RDS_SESSION_STATE_CONNECTED);
1772 	}
1773 	sp->session_failover = 0;
1774 	rw_exit(&sp->session_lock);
1775 
1776 	RDS_DPRINTF2("rds_resend_messages", "Return: SP(%p)", sp);
1777 }
1778 
1779 /*
1780  * This is called when a channel is connected. Transition the session to
1781  * CONNECTED state iff both channels are connected.
1782  */
1783 void
1784 rds_session_active(rds_session_t *sp)
1785 {
1786 	rds_ep_t	*ep;
1787 	uint_t		failover;
1788 
1789 	RDS_DPRINTF2("rds_session_active", "Enter: 0x%p", sp);
1790 
1791 	rw_enter(&sp->session_lock, RW_READER);
1792 
1793 	failover = sp->session_failover;
1794 
1795 	/*
1796 	 * we establish the data channel first, so check the control channel
1797 	 * first but make sure it is initialized.
1798 	 */
1799 	ep = &sp->session_ctrlep;
1800 	mutex_enter(&ep->ep_lock);
1801 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1802 		/* the session is not ready yet */
1803 		mutex_exit(&ep->ep_lock);
1804 		rw_exit(&sp->session_lock);
1805 		return;
1806 	}
1807 	mutex_exit(&ep->ep_lock);
1808 
1809 	/* control channel is connected, check the data channel */
1810 	ep = &sp->session_dataep;
1811 	mutex_enter(&ep->ep_lock);
1812 	if (ep->ep_state != RDS_EP_STATE_CONNECTED) {
1813 		/* data channel is not yet connected */
1814 		mutex_exit(&ep->ep_lock);
1815 		rw_exit(&sp->session_lock);
1816 		return;
1817 	}
1818 	mutex_exit(&ep->ep_lock);
1819 
1820 	if (failover) {
1821 		rw_exit(&sp->session_lock);
1822 
1823 		/*
1824 		 * The session has failed over. Previous msgs have to be
1825 		 * re-sent before the session is moved to the connected
1826 		 * state.
1827 		 */
1828 		RDS_DPRINTF2("rds_session_active", "SP(%p) Dispatching taskq "
1829 		    "to re-send messages", sp);
1830 		(void) ddi_taskq_dispatch(rds_taskq,
1831 		    rds_resend_messages, (void *)sp, DDI_SLEEP);
1832 		return;
1833 	}
1834 
1835 	/* the session is ready */
1836 	sp->session_state = RDS_SESSION_STATE_CONNECTED;
1837 	RDS_DPRINTF3("rds_session_active",
1838 	    "SP(%p) State RDS_SESSION_STATE_CONNECTED", sp);
1839 
1840 	rw_exit(&sp->session_lock);
1841 
1842 	RDS_DPRINTF2("rds_session_active", "Return: SP(%p) is CONNECTED", sp);
1843 }
1844 
1845 static int
1846 rds_ep_sendmsg(rds_ep_t *ep, uio_t *uiop, in_port_t sendport,
1847     in_port_t recvport)
1848 {
1849 	int	ret;
1850 
1851 	RDS_DPRINTF4("rds_ep_sendmsg", "Enter: EP(%p) sendport: %d recvport: "
1852 	    "%d", ep, sendport, recvport);
1853 
1854 	/* make sure the remote port is not stalled */
1855 	if (rds_is_port_marked(ep->ep_sp, recvport, RDS_REMOTE)) {
1856 		RDS_DPRINTF2(LABEL, "SP(%p) Port:%d is in stall state",
1857 		    ep->ep_sp, recvport);
1858 		RDS_INCR_EWOULDBLOCK();
1859 		ret = ENOMEM;
1860 	} else {
1861 		ret = rds_build_n_post_msg(ep, uiop, sendport, recvport);
1862 	}
1863 
1864 	RDS_DPRINTF4("rds_ep_sendmsg", "Return: EP(%p)", ep);
1865 
1866 	return (ret);
1867 }
1868 
1869 /* Send a message to a destination socket */
1870 int
1871 rds_sendmsg(uio_t *uiop, ipaddr_t sendip, ipaddr_t recvip, in_port_t sendport,
1872     in_port_t recvport, zoneid_t zoneid)
1873 {
1874 	rds_session_t	*sp;
1875 	ib_gid_t	lgid, rgid;
1876 	int		ret;
1877 
1878 	RDS_DPRINTF4("rds_sendmsg", "Enter: uiop: 0x%p, srcIP: 0x%x destIP: "
1879 	    "0x%x sndport: %d recvport: %d", uiop, sendip, recvip,
1880 	    sendport, recvport);
1881 
1882 	/* If msg length is 0, just return success */
1883 	if (uiop->uio_resid == 0) {
1884 		RDS_DPRINTF2("rds_sendmsg", "Zero sized message");
1885 		return (0);
1886 	}
1887 
1888 	/* Is there a session to the destination? */
1889 	rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1890 	sp = rds_session_lkup(rdsib_statep, recvip, 0);
1891 	rw_exit(&rdsib_statep->rds_sessionlock);
1892 
1893 	/* Is this a loopback message? */
1894 	if ((sp == NULL) && (rds_islocal(recvip))) {
1895 		/* make sure the port is not stalled */
1896 		if (rds_is_port_marked(NULL, recvport, RDS_LOOPBACK)) {
1897 			RDS_DPRINTF2(LABEL, "Local Port:%d is in stall state",
1898 			    recvport);
1899 			RDS_INCR_EWOULDBLOCK();
1900 			return (ENOMEM);
1901 		}
1902 		ret = rds_deliver_loopback_msg(uiop, recvip, sendip, recvport,
1903 		    sendport, zoneid);
1904 		return (ret);
1905 	}
1906 
1907 	/* Not a loopback message */
1908 	if (sp == NULL) {
1909 		/* There is no session to the destination, create one. */
1910 		RDS_DPRINTF3(LABEL, "There is no session to the destination "
1911 		    "IP: 0x%x", recvip);
1912 		sp = rds_session_create(rdsib_statep, sendip, recvip, NULL,
1913 		    RDS_SESSION_ACTIVE);
1914 		if (sp != NULL) {
1915 			rw_enter(&sp->session_lock, RW_WRITER);
1916 			if (sp->session_type == RDS_SESSION_ACTIVE) {
1917 				ret = rds_session_init(sp);
1918 				if (ret != 0) {
1919 					RDS_DPRINTF2("rds_sendmsg",
1920 					    "SP(%p): rds_session_init failed",
1921 					    sp);
1922 					sp->session_state =
1923 					    RDS_SESSION_STATE_FAILED;
1924 					RDS_DPRINTF3("rds_sendmsg",
1925 					    "SP(%p) State "
1926 					    "RDS_SESSION_STATE_FAILED", sp);
1927 					rw_exit(&sp->session_lock);
1928 					return (EFAULT);
1929 				}
1930 				sp->session_state = RDS_SESSION_STATE_INIT;
1931 				RDS_DPRINTF3("rds_sendmsg",
1932 				    "SP(%p) State "
1933 				    "RDS_SESSION_STATE_INIT", sp);
1934 				rw_exit(&sp->session_lock);
1935 				rds_session_open(sp);
1936 			} else {
1937 				rw_exit(&sp->session_lock);
1938 			}
1939 		} else {
1940 			/* Is a session created for this destination */
1941 			rw_enter(&rdsib_statep->rds_sessionlock, RW_READER);
1942 			sp = rds_session_lkup(rdsib_statep, recvip, 0);
1943 			rw_exit(&rdsib_statep->rds_sessionlock);
1944 			if (sp == NULL) {
1945 				return (EFAULT);
1946 			}
1947 		}
1948 	}
1949 
1950 	/* There is a session to the destination */
1951 	rw_enter(&sp->session_lock, RW_READER);
1952 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
1953 		rw_exit(&sp->session_lock);
1954 
1955 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
1956 		    recvport);
1957 		return (ret);
1958 	} else if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
1959 	    (sp->session_state == RDS_SESSION_STATE_FINI)) {
1960 		ipaddr_t sendip1, recvip1;
1961 
1962 		RDS_DPRINTF3("rds_sendmsg", "SP(%p) is not connected, State: "
1963 		    "%d", sp);
1964 		rw_exit(&sp->session_lock);
1965 		rw_enter(&sp->session_lock, RW_WRITER);
1966 		if ((sp->session_state == RDS_SESSION_STATE_FAILED) ||
1967 		    (sp->session_state == RDS_SESSION_STATE_FINI)) {
1968 			ibt_ip_path_attr_t	ipattr;
1969 			ibt_ip_addr_t		dstip;
1970 
1971 			sp->session_state = RDS_SESSION_STATE_CREATED;
1972 			sp->session_type = RDS_SESSION_ACTIVE;
1973 			RDS_DPRINTF3("rds_sendmsg", "SP(%p) State "
1974 			    "RDS_SESSION_STATE_CREATED", sp);
1975 			rw_exit(&sp->session_lock);
1976 
1977 
1978 			/* The ipaddr should be in the network order */
1979 			sendip1 = sendip;
1980 			recvip1 = recvip;
1981 			ret = rds_sc_path_lookup(&sendip1, &recvip1);
1982 			if (ret == 0) {
1983 				RDS_DPRINTF2(LABEL, "Path not found "
1984 				    "(0x%x 0x%x)", sendip1, recvip1);
1985 			}
1986 
1987 			/* Resolve the IP addresses */
1988 			lgid.gid_prefix = 0;
1989 			lgid.gid_guid = 0;
1990 			rgid.gid_prefix = 0;
1991 			rgid.gid_guid = 0;
1992 
1993 			bzero(&ipattr, sizeof (ibt_ip_path_attr_t));
1994 			dstip.family = AF_INET;
1995 			dstip.un.ip4addr = htonl(recvip1);
1996 			ipattr.ipa_dst_ip = &dstip;
1997 			ipattr.ipa_src_ip.family = AF_INET;
1998 			ipattr.ipa_src_ip.un.ip4addr = htonl(sendip1);
1999 			ipattr.ipa_ndst = 1;
2000 			ipattr.ipa_max_paths = 1;
2001 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths: 0x%x <-> 0x%x ",
2002 			    sendip1, recvip1);
2003 			ret = ibt_get_ip_paths(rdsib_statep->rds_ibhdl,
2004 			    IBT_PATH_NO_FLAGS, &ipattr, &sp->session_pinfo,
2005 			    NULL, NULL);
2006 			if (ret != IBT_SUCCESS) {
2007 				RDS_DPRINTF1("rds_sendmsg",
2008 				    "ibt_get_ip_paths failed, ret: %d ", ret);
2009 
2010 				rw_enter(&sp->session_lock, RW_WRITER);
2011 				if (sp->session_type == RDS_SESSION_ACTIVE) {
2012 					sp->session_state =
2013 					    RDS_SESSION_STATE_FAILED;
2014 					RDS_DPRINTF3("rds_sendmsg",
2015 					    "SP(%p) State "
2016 					    "RDS_SESSION_STATE_FAILED", sp);
2017 					rw_exit(&sp->session_lock);
2018 					return (EFAULT);
2019 				} else {
2020 					rw_exit(&sp->session_lock);
2021 					return (ENOMEM);
2022 				}
2023 			}
2024 			RDS_DPRINTF2(LABEL, "ibt_get_ip_paths success");
2025 			lgid = sp->session_pinfo.
2026 			    pi_prim_cep_path.cep_adds_vect.av_sgid;
2027 			rgid = sp->session_pinfo.
2028 			    pi_prim_cep_path.cep_adds_vect.av_dgid;
2029 
2030 			RDS_DPRINTF2(LABEL, "lgid: %llx:%llx rgid: %llx:%llx",
2031 			    lgid.gid_prefix, lgid.gid_guid, rgid.gid_prefix,
2032 			    rgid.gid_guid);
2033 
2034 			rw_enter(&sp->session_lock, RW_WRITER);
2035 			if (sp->session_type == RDS_SESSION_ACTIVE) {
2036 				sp->session_lgid = lgid;
2037 				sp->session_rgid = rgid;
2038 				ret = rds_session_init(sp);
2039 				if (ret != 0) {
2040 					RDS_DPRINTF2("rds_sendmsg",
2041 					    "SP(%p): rds_session_init failed",
2042 					    sp);
2043 					sp->session_state =
2044 					    RDS_SESSION_STATE_FAILED;
2045 					RDS_DPRINTF3("rds_sendmsg",
2046 					    "SP(%p) State "
2047 					    "RDS_SESSION_STATE_FAILED", sp);
2048 					rw_exit(&sp->session_lock);
2049 					return (EFAULT);
2050 				}
2051 				sp->session_state = RDS_SESSION_STATE_INIT;
2052 				rw_exit(&sp->session_lock);
2053 
2054 				rds_session_open(sp);
2055 
2056 			} else {
2057 				RDS_DPRINTF2("rds_sendmsg",
2058 				    "SP(%p): type changed to %d",
2059 				    sp, sp->session_type);
2060 				rw_exit(&sp->session_lock);
2061 				return (ENOMEM);
2062 			}
2063 		} else {
2064 			RDS_DPRINTF2("rds_sendmsg",
2065 			    "SP(%p): Session state %d changed",
2066 			    sp, sp->session_state);
2067 			rw_exit(&sp->session_lock);
2068 			return (ENOMEM);
2069 		}
2070 	} else {
2071 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): Session is in %d state",
2072 		    sp, sp->session_state);
2073 		rw_exit(&sp->session_lock);
2074 		return (ENOMEM);
2075 	}
2076 
2077 	rw_enter(&sp->session_lock, RW_READER);
2078 	if (sp->session_state == RDS_SESSION_STATE_CONNECTED) {
2079 		rw_exit(&sp->session_lock);
2080 
2081 		ret = rds_ep_sendmsg(&sp->session_dataep, uiop, sendport,
2082 		    recvport);
2083 	} else {
2084 		RDS_DPRINTF2("rds_sendmsg", "SP(%p): state(%d) not connected",
2085 		    sp, sp->session_state);
2086 		rw_exit(&sp->session_lock);
2087 	}
2088 
2089 	RDS_DPRINTF4("rds_sendmsg", "Return: SP(%p) ret: %d", sp, ret);
2090 
2091 	return (ret);
2092 }
2093 
2094 /* Note: This is called on the CQ handler thread */
2095 void
2096 rds_received_msg(rds_ep_t *ep, rds_buf_t *bp)
2097 {
2098 	mblk_t		*mp, *mp1;
2099 	rds_data_hdr_t	*pktp, *pktp1;
2100 	uint8_t		*datap;
2101 	rds_buf_t	*bp1;
2102 	rds_bufpool_t	*rpool;
2103 	uint_t		npkts, ix;
2104 	int		ret;
2105 
2106 	RDS_DPRINTF4("rds_received_msg", "Enter: EP(%p)", ep);
2107 
2108 	pktp = (rds_data_hdr_t *)(uintptr_t)bp->buf_ds.ds_va;
2109 	datap = ((uint8_t *)(uintptr_t)bp->buf_ds.ds_va) + RDS_DATA_HDR_SZ;
2110 	npkts = pktp->dh_npkts;
2111 
2112 	/* increment rx pending here */
2113 	rpool = &ep->ep_rcvpool;
2114 	mutex_enter(&rpool->pool_lock);
2115 	rpool->pool_nbusy += npkts;
2116 	mutex_exit(&rpool->pool_lock);
2117 
2118 	/* this will get freed by sockfs */
2119 	mp = esballoc(datap, pktp->dh_datalen, BPRI_HI, &bp->buf_frtn);
2120 	if (mp == NULL) {
2121 		RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2122 		    ep, bp);
2123 		rds_free_recv_buf(bp, npkts);
2124 		return;
2125 	}
2126 	mp->b_wptr = datap + pktp->dh_datalen;
2127 	mp->b_datap->db_type = M_DATA;
2128 
2129 	mp1 = mp;
2130 	bp1 = bp->buf_nextp;
2131 	while (bp1 != NULL) {
2132 		pktp1 = (rds_data_hdr_t *)(uintptr_t)bp1->buf_ds.ds_va;
2133 		datap = ((uint8_t *)(uintptr_t)bp1->buf_ds.ds_va) +
2134 		    RDS_DATA_HDR_SZ;
2135 
2136 		mp1->b_cont = esballoc(datap, pktp1->dh_datalen,
2137 		    BPRI_HI, &bp1->buf_frtn);
2138 		if (mp1->b_cont == NULL) {
2139 			RDS_DPRINTF2(LABEL, "EP(%p) BP(%p): allocb failed",
2140 			    ep, bp1);
2141 			freemsg(mp);
2142 			rds_free_recv_buf(bp1, pktp1->dh_npkts);
2143 			return;
2144 		}
2145 		mp1 = mp1->b_cont;
2146 		mp1->b_wptr = datap + pktp1->dh_datalen;
2147 		mp1->b_datap->db_type = M_DATA;
2148 
2149 		bp1 = bp1->buf_nextp;
2150 	}
2151 
2152 	RDS_INCR_RXPKTS_PEND(npkts);
2153 	RDS_INCR_RXPKTS(npkts);
2154 	RDS_INCR_RXBYTES(msgdsize(mp));
2155 
2156 	RDS_DPRINTF5(LABEL, "Deliver Message: sendIP: 0x%x recvIP: 0x%x "
2157 	    "sendport: %d recvport: %d npkts: %d pktno: %d", ep->ep_remip,
2158 	    ep->ep_myip, pktp->dh_sendport, pktp->dh_recvport,
2159 	    npkts, pktp->dh_psn);
2160 
2161 	/* store the last buffer id, no lock needed */
2162 	if (npkts > 1) {
2163 		ep->ep_rbufid = pktp1->dh_bufid;
2164 	} else {
2165 		ep->ep_rbufid = pktp->dh_bufid;
2166 	}
2167 
2168 	ret = rds_deliver_new_msg(mp, ep->ep_myip, ep->ep_remip,
2169 	    pktp->dh_recvport, pktp->dh_sendport, ALL_ZONES);
2170 	if (ret != 0) {
2171 		if (ret == ENOSPC) {
2172 			/*
2173 			 * The message is delivered but cannot take more,
2174 			 * stop further remote messages coming to this port
2175 			 */
2176 			RDS_DPRINTF3("rds_received_msg", "Port %d NO SPACE",
2177 			    pktp->dh_recvport);
2178 			rds_stall_port(ep->ep_sp, pktp->dh_recvport, RDS_LOCAL);
2179 		} else {
2180 			RDS_DPRINTF1(LABEL, "rds_deliver_new_msg returned: %d",
2181 			    ret);
2182 		}
2183 	}
2184 
2185 	mutex_enter(&ep->ep_lock);
2186 	if (ep->ep_rdmacnt == 0) {
2187 		ep->ep_rdmacnt++;
2188 		*(uintptr_t *)(uintptr_t)ep->ep_ackds.ds_va = ep->ep_rbufid;
2189 		mutex_exit(&ep->ep_lock);
2190 
2191 		/* send acknowledgement */
2192 		RDS_INCR_TXACKS();
2193 		ret = ibt_post_send(ep->ep_chanhdl, &ep->ep_ackwr, 1, &ix);
2194 		if (ret != IBT_SUCCESS) {
2195 			RDS_DPRINTF1(LABEL, "EP(%p): ibt_post_send for "
2196 			    "acknowledgement failed: %d, SQ depth: %d",
2197 			    ep, ret, ep->ep_sndpool.pool_nbusy);
2198 			mutex_enter(&ep->ep_lock);
2199 			ep->ep_rdmacnt--;
2200 			mutex_exit(&ep->ep_lock);
2201 		}
2202 	} else {
2203 		/* no room to send acknowledgement */
2204 		mutex_exit(&ep->ep_lock);
2205 	}
2206 
2207 	RDS_DPRINTF4("rds_received_msg", "Return: EP(%p)", ep);
2208 }
2209