xref: /titanic_50/usr/src/uts/sun4v/io/vsw_txdring.c (revision f441771b0ce9f9d6122d318ff8290cb1a2848f9d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 #include <sys/types.h>
27 #include <sys/errno.h>
28 #include <sys/sysmacros.h>
29 #include <sys/param.h>
30 #include <sys/machsystm.h>
31 #include <sys/stream.h>
32 #include <sys/strsubr.h>
33 #include <sys/kmem.h>
34 #include <sys/strsun.h>
35 #include <sys/callb.h>
36 #include <sys/sdt.h>
37 #include <sys/mach_descrip.h>
38 #include <sys/mdeg.h>
39 #include <net/if.h>
40 #include <sys/vsw.h>
41 #include <sys/vio_mailbox.h>
42 #include <sys/vio_common.h>
43 #include <sys/vnet_common.h>
44 #include <sys/vnet_mailbox.h>
45 #include <sys/vio_util.h>
46 
47 /*
48  * This file contains the implementation of TxDring data transfer mode of VIO
49  * Protocol in vsw. The functions in this file are invoked from vsw_ldc.c
50  * after TxDring mode is negotiated with the peer during attribute phase of
51  * handshake. This file contains functions that setup the transmit and receive
52  * descriptor rings, and associated resources in TxDring mode. It also contains
53  * the transmit and receive data processing functions that are invoked in
54  * TxDring mode.
55  */
56 
57 /* Functions exported to vsw_ldc.c */
58 vio_dring_reg_msg_t *vsw_create_tx_dring_info(vsw_ldc_t *);
59 int vsw_setup_tx_dring(vsw_ldc_t *ldcp, dring_info_t *dp);
60 void vsw_destroy_tx_dring(vsw_ldc_t *ldcp);
61 dring_info_t *vsw_map_rx_dring(vsw_ldc_t *ldcp, void *pkt);
62 void vsw_unmap_rx_dring(vsw_ldc_t *ldcp);
63 int vsw_dringsend(vsw_ldc_t *, mblk_t *);
64 void vsw_ldc_msg_worker(void *arg);
65 void vsw_stop_msg_thread(vsw_ldc_t *ldcp);
66 void vsw_process_dringdata(void *, void *);
67 int vsw_send_msg(vsw_ldc_t *, void *, int, boolean_t);
68 int vsw_reclaim_dring(dring_info_t *dp, int start);
69 int vsw_dring_find_free_desc(dring_info_t *, vsw_private_desc_t **, int *);
70 
71 /* Internal functions */
72 static int vsw_init_multipools(vsw_ldc_t *ldcp, vsw_t *vswp);
73 static dring_info_t *vsw_create_tx_dring(vsw_ldc_t *);
74 
75 /* Functions imported from vsw_ldc.c */
76 extern void vsw_process_pkt(void *);
77 extern void vsw_destroy_rxpools(void *);
78 extern dring_info_t *vsw_map_dring_cmn(vsw_ldc_t *ldcp,
79     vio_dring_reg_msg_t *dring_pkt);
80 extern void vsw_process_conn_evt(vsw_ldc_t *, uint16_t);
81 extern mblk_t *vsw_vlan_frame_pretag(void *arg, int type, mblk_t *mp);
82 
83 /* Tunables */
84 extern int vsw_wretries;
85 extern int vsw_recv_delay;
86 extern int vsw_recv_retries;
87 extern boolean_t vsw_jumbo_rxpools;
88 extern uint32_t vsw_chain_len;
89 extern uint32_t vsw_num_descriptors;
90 extern uint32_t vsw_mblk_size1;
91 extern uint32_t vsw_mblk_size2;
92 extern uint32_t vsw_mblk_size3;
93 extern uint32_t vsw_mblk_size4;
94 extern uint32_t vsw_num_mblks1;
95 extern uint32_t vsw_num_mblks2;
96 extern uint32_t vsw_num_mblks3;
97 extern uint32_t vsw_num_mblks4;
98 
99 #define	VSW_NUM_VMPOOLS		3	/* number of vio mblk pools */
100 
101 #define	SND_DRING_NACK(ldcp, pkt) \
102 	pkt->tag.vio_subtype = VIO_SUBTYPE_NACK; \
103 	pkt->tag.vio_sid = ldcp->local_session; \
104 	(void) vsw_send_msg(ldcp, (void *)pkt, \
105 			sizeof (vio_dring_msg_t), B_TRUE);
106 
107 vio_dring_reg_msg_t *
108 vsw_create_tx_dring_info(vsw_ldc_t *ldcp)
109 {
110 	vio_dring_reg_msg_t	*mp;
111 	dring_info_t		*dp;
112 	vsw_t			*vswp = ldcp->ldc_vswp;
113 
114 	D1(vswp, "%s enter\n", __func__);
115 
116 	/*
117 	 * If we can't create a dring, obviously no point sending
118 	 * a message.
119 	 */
120 	if ((dp = vsw_create_tx_dring(ldcp)) == NULL)
121 		return (NULL);
122 
123 	mp = kmem_zalloc(sizeof (vio_dring_reg_msg_t), KM_SLEEP);
124 
125 	mp->tag.vio_msgtype = VIO_TYPE_CTRL;
126 	mp->tag.vio_subtype = VIO_SUBTYPE_INFO;
127 	mp->tag.vio_subtype_env = VIO_DRING_REG;
128 	mp->tag.vio_sid = ldcp->local_session;
129 
130 	/* payload */
131 	mp->num_descriptors = dp->num_descriptors;
132 	mp->descriptor_size = dp->descriptor_size;
133 	mp->options = dp->options;
134 	mp->ncookies = dp->dring_ncookies;
135 	bcopy(&dp->dring_cookie[0], &mp->cookie[0], sizeof (ldc_mem_cookie_t));
136 
137 	mp->dring_ident = 0;
138 
139 	D1(vswp, "%s exit\n", __func__);
140 
141 	return (mp);
142 }
143 
144 /*
145  * Allocate transmit resources for the channel. The resources consist of a
146  * transmit descriptor ring and an associated transmit buffer area.
147  */
148 static dring_info_t *
149 vsw_create_tx_dring(vsw_ldc_t *ldcp)
150 {
151 	vsw_t			*vswp = ldcp->ldc_vswp;
152 	ldc_mem_info_t		minfo;
153 	dring_info_t		*dp;
154 
155 	dp = (dring_info_t *)kmem_zalloc(sizeof (dring_info_t), KM_SLEEP);
156 	mutex_init(&dp->dlock, NULL, MUTEX_DRIVER, NULL);
157 	mutex_init(&dp->restart_lock, NULL, MUTEX_DRIVER, NULL);
158 	ldcp->lane_out.dringp = dp;
159 
160 	/* create public section of ring */
161 	if ((ldc_mem_dring_create(vsw_num_descriptors,
162 	    sizeof (vnet_public_desc_t), &dp->dring_handle)) != 0) {
163 
164 		DERR(vswp, "vsw_create_tx_dring(%lld): ldc dring create "
165 		    "failed", ldcp->ldc_id);
166 		goto fail;
167 	}
168 	ASSERT(dp->dring_handle != NULL);
169 
170 	/*
171 	 * Get the base address of the public section of the ring.
172 	 */
173 	if ((ldc_mem_dring_info(dp->dring_handle, &minfo)) != 0) {
174 		DERR(vswp, "vsw_create_tx_dring(%lld): dring info failed\n",
175 		    ldcp->ldc_id);
176 		goto fail;
177 	} else {
178 		ASSERT(minfo.vaddr != 0);
179 		dp->pub_addr = minfo.vaddr;
180 	}
181 
182 	dp->num_descriptors = vsw_num_descriptors;
183 	dp->descriptor_size = sizeof (vnet_public_desc_t);
184 	dp->options = VIO_TX_DRING;
185 	dp->dring_ncookies = 1;	/* guaranteed by ldc */
186 
187 	/*
188 	 * create private portion of ring
189 	 */
190 	dp->priv_addr = (vsw_private_desc_t *)kmem_zalloc(
191 	    (sizeof (vsw_private_desc_t) * vsw_num_descriptors), KM_SLEEP);
192 
193 	if (vsw_setup_tx_dring(ldcp, dp)) {
194 		DERR(vswp, "%s: unable to setup ring", __func__);
195 		goto fail;
196 	}
197 
198 	/* bind dring to the channel */
199 	if ((ldc_mem_dring_bind(ldcp->ldc_handle, dp->dring_handle,
200 	    LDC_DIRECT_MAP | LDC_SHADOW_MAP, LDC_MEM_RW,
201 	    &dp->dring_cookie[0], &dp->dring_ncookies)) != 0) {
202 		DERR(vswp, "vsw_create_tx_dring: unable to bind to channel "
203 		    "%lld", ldcp->ldc_id);
204 		goto fail;
205 	}
206 
207 	/* haven't used any descriptors yet */
208 	dp->end_idx = 0;
209 	dp->last_ack_recv = -1;
210 	dp->restart_reqd = B_TRUE;
211 
212 	return (dp);
213 
214 fail:
215 	vsw_destroy_tx_dring(ldcp);
216 	return (NULL);
217 }
218 
219 /*
220  * Setup the descriptors in the tx dring.
221  * Returns 0 on success, 1 on failure.
222  */
223 int
224 vsw_setup_tx_dring(vsw_ldc_t *ldcp, dring_info_t *dp)
225 {
226 	vnet_public_desc_t	*pub_addr = NULL;
227 	vsw_private_desc_t	*priv_addr = NULL;
228 	vsw_t			*vswp = ldcp->ldc_vswp;
229 	uint64_t		*tmpp;
230 	uint64_t		offset = 0;
231 	uint32_t		ncookies = 0;
232 	static char		*name = "vsw_setup_ring";
233 	int			i, j, nc, rv;
234 	size_t			data_sz;
235 	void			*data_addr;
236 
237 	priv_addr = dp->priv_addr;
238 	pub_addr = dp->pub_addr;
239 
240 	/* public section may be null but private should never be */
241 	ASSERT(priv_addr != NULL);
242 
243 	/*
244 	 * Allocate the region of memory which will be used to hold
245 	 * the data the descriptors will refer to.
246 	 */
247 	data_sz = vswp->max_frame_size + VNET_IPALIGN + VNET_LDCALIGN;
248 
249 	/*
250 	 * In order to ensure that the number of ldc cookies per descriptor is
251 	 * limited to be within the default MAX_COOKIES (2), we take the steps
252 	 * outlined below:
253 	 *
254 	 * Align the entire data buffer area to 8K and carve out per descriptor
255 	 * data buffers starting from this 8K aligned base address.
256 	 *
257 	 * We round up the mtu specified to be a multiple of 2K or 4K.
258 	 * For sizes up to 12K we round up the size to the next 2K.
259 	 * For sizes > 12K we round up to the next 4K (otherwise sizes such as
260 	 * 14K could end up needing 3 cookies, with the buffer spread across
261 	 * 3 8K pages:  8K+6K, 2K+8K+2K, 6K+8K, ...).
262 	 */
263 	if (data_sz <= VNET_12K) {
264 		data_sz = VNET_ROUNDUP_2K(data_sz);
265 	} else {
266 		data_sz = VNET_ROUNDUP_4K(data_sz);
267 	}
268 
269 	dp->desc_data_sz = data_sz;
270 
271 	/* allocate extra 8K bytes for alignment */
272 	dp->data_sz = (vsw_num_descriptors * data_sz) + VNET_8K;
273 	data_addr = kmem_alloc(dp->data_sz, KM_SLEEP);
274 	dp->data_addr = data_addr;
275 
276 	D2(vswp, "%s: allocated %lld bytes at 0x%llx\n", name,
277 	    dp->data_sz, dp->data_addr);
278 
279 	/* align the starting address of the data area to 8K */
280 	data_addr = (void *)VNET_ROUNDUP_8K((uintptr_t)data_addr);
281 
282 	tmpp = (uint64_t *)data_addr;
283 	offset = dp->desc_data_sz/sizeof (tmpp);
284 
285 	/*
286 	 * Initialise some of the private and public (if they exist)
287 	 * descriptor fields.
288 	 */
289 	for (i = 0; i < vsw_num_descriptors; i++) {
290 		mutex_init(&priv_addr->dstate_lock, NULL, MUTEX_DRIVER, NULL);
291 
292 		if ((ldc_mem_alloc_handle(ldcp->ldc_handle,
293 		    &priv_addr->memhandle)) != 0) {
294 			DERR(vswp, "%s: alloc mem handle failed", name);
295 			goto fail;
296 		}
297 
298 		priv_addr->datap = (void *)tmpp;
299 
300 		rv = ldc_mem_bind_handle(priv_addr->memhandle,
301 		    (caddr_t)priv_addr->datap, dp->desc_data_sz,
302 		    LDC_SHADOW_MAP, LDC_MEM_R|LDC_MEM_W,
303 		    &(priv_addr->memcookie[0]), &ncookies);
304 		if (rv != 0) {
305 			DERR(vswp, "%s(%lld): ldc_mem_bind_handle failed "
306 			    "(rv %d)", name, ldcp->ldc_id, rv);
307 			goto fail;
308 		}
309 		priv_addr->bound = 1;
310 
311 		D2(vswp, "%s: %d: memcookie 0 : addr 0x%llx : size 0x%llx",
312 		    name, i, priv_addr->memcookie[0].addr,
313 		    priv_addr->memcookie[0].size);
314 
315 		if (ncookies >= (uint32_t)(VSW_MAX_COOKIES + 1)) {
316 			DERR(vswp, "%s(%lld) ldc_mem_bind_handle returned "
317 			    "invalid num of cookies (%d) for size 0x%llx",
318 			    name, ldcp->ldc_id, ncookies, VSW_RING_EL_DATA_SZ);
319 
320 			goto fail;
321 		} else {
322 			for (j = 1; j < ncookies; j++) {
323 				rv = ldc_mem_nextcookie(priv_addr->memhandle,
324 				    &(priv_addr->memcookie[j]));
325 				if (rv != 0) {
326 					DERR(vswp, "%s: ldc_mem_nextcookie "
327 					    "failed rv (%d)", name, rv);
328 					goto fail;
329 				}
330 				D3(vswp, "%s: memcookie %d : addr 0x%llx : "
331 				    "size 0x%llx", name, j,
332 				    priv_addr->memcookie[j].addr,
333 				    priv_addr->memcookie[j].size);
334 			}
335 
336 		}
337 		priv_addr->ncookies = ncookies;
338 		priv_addr->dstate = VIO_DESC_FREE;
339 
340 		if (pub_addr != NULL) {
341 
342 			/* link pub and private sides */
343 			priv_addr->descp = pub_addr;
344 
345 			pub_addr->ncookies = priv_addr->ncookies;
346 
347 			for (nc = 0; nc < pub_addr->ncookies; nc++) {
348 				bcopy(&priv_addr->memcookie[nc],
349 				    &pub_addr->memcookie[nc],
350 				    sizeof (ldc_mem_cookie_t));
351 			}
352 
353 			pub_addr->hdr.dstate = VIO_DESC_FREE;
354 			pub_addr++;
355 		}
356 
357 		/*
358 		 * move to next element in the dring and the next
359 		 * position in the data buffer.
360 		 */
361 		priv_addr++;
362 		tmpp += offset;
363 	}
364 
365 	return (0);
366 
367 fail:
368 	/* return failure; caller will cleanup */
369 	return (1);
370 }
371 
372 /*
373  * Free transmit resources for the channel.
374  */
375 void
376 vsw_destroy_tx_dring(vsw_ldc_t *ldcp)
377 {
378 	vsw_private_desc_t	*paddr = NULL;
379 	int			i;
380 	lane_t			*lp = &ldcp->lane_out;
381 	dring_info_t		*dp;
382 
383 	dp = lp->dringp;
384 	if (dp == NULL) {
385 		return;
386 	}
387 
388 	mutex_enter(&dp->dlock);
389 
390 	if (dp->priv_addr != NULL) {
391 		/*
392 		 * First unbind and free the memory handles
393 		 * stored in each descriptor within the ring.
394 		 */
395 		for (i = 0; i < vsw_num_descriptors; i++) {
396 			paddr = (vsw_private_desc_t *)dp->priv_addr + i;
397 			if (paddr->memhandle != NULL) {
398 				if (paddr->bound == 1) {
399 					if (ldc_mem_unbind_handle(
400 					    paddr->memhandle) != 0) {
401 						DERR(NULL, "error "
402 						"unbinding handle for "
403 						"ring 0x%llx at pos %d",
404 						    dp, i);
405 						continue;
406 					}
407 					paddr->bound = 0;
408 				}
409 
410 				if (ldc_mem_free_handle(
411 				    paddr->memhandle) != 0) {
412 					DERR(NULL, "error freeing "
413 					    "handle for ring 0x%llx "
414 					    "at pos %d", dp, i);
415 					continue;
416 				}
417 				paddr->memhandle = NULL;
418 			}
419 			mutex_destroy(&paddr->dstate_lock);
420 		}
421 		kmem_free(dp->priv_addr,
422 		    (sizeof (vsw_private_desc_t) * vsw_num_descriptors));
423 	}
424 
425 	/*
426 	 * Now unbind and destroy the ring itself.
427 	 */
428 	if (dp->dring_handle != NULL) {
429 		(void) ldc_mem_dring_unbind(dp->dring_handle);
430 		(void) ldc_mem_dring_destroy(dp->dring_handle);
431 	}
432 
433 	if (dp->data_addr != NULL) {
434 		kmem_free(dp->data_addr, dp->data_sz);
435 	}
436 
437 	mutex_exit(&dp->dlock);
438 	mutex_destroy(&dp->dlock);
439 	mutex_destroy(&dp->restart_lock);
440 	kmem_free(dp, sizeof (dring_info_t));
441 	lp->dringp = NULL;
442 }
443 
444 /*
445  * Map the transmit descriptor ring exported
446  * by the peer, as our receive descriptor ring.
447  */
448 dring_info_t *
449 vsw_map_rx_dring(vsw_ldc_t *ldcp, void *pkt)
450 {
451 	int			rv;
452 	dring_info_t		*dp;
453 	vio_dring_reg_msg_t	*dring_pkt = pkt;
454 	vsw_t			*vswp = ldcp->ldc_vswp;
455 
456 	dp = vsw_map_dring_cmn(ldcp, dring_pkt);
457 	if (dp == NULL) {
458 		return (NULL);
459 	}
460 
461 	/* TxDring mode specific initializations */
462 	dp->end_idx = 0;
463 	ldcp->lane_in.dringp = dp;
464 
465 	/* Allocate pools of receive mblks */
466 	rv = vsw_init_multipools(ldcp, vswp);
467 	if (rv != 0) {
468 		/*
469 		 * We do not return failure if receive mblk pools can't
470 		 * be allocated, instead allocb(9F) will be used to
471 		 * dynamically allocate buffers during receive.
472 		 */
473 		DWARN(vswp, "%s: unable to create free mblk pools for"
474 		    " channel %ld (rv %d)", __func__, ldcp->ldc_id, rv);
475 	}
476 
477 	return (dp);
478 }
479 
480 /*
481  * Unmap the receive descriptor ring.
482  */
483 void
484 vsw_unmap_rx_dring(vsw_ldc_t *ldcp)
485 {
486 	vio_mblk_pool_t *fvmp = NULL;
487 	vsw_t		*vswp = ldcp->ldc_vswp;
488 	lane_t		*lp = &ldcp->lane_in;
489 	dring_info_t	*dp;
490 
491 	if ((dp = lp->dringp) == NULL) {
492 		return;
493 	}
494 
495 	/*
496 	 * If we can't destroy all the rx pools for this channel,
497 	 * dispatch a task to retry and clean up those rx pools. Note
498 	 * that we don't need to wait for the task to complete. If the
499 	 * vsw device itself gets detached (vsw_detach()), it will wait
500 	 * for the task to complete implicitly in ddi_taskq_destroy().
501 	 */
502 	vio_destroy_multipools(&ldcp->vmp, &fvmp);
503 	if (fvmp != NULL) {
504 		(void) ddi_taskq_dispatch(vswp->rxp_taskq,
505 		    vsw_destroy_rxpools, fvmp, DDI_SLEEP);
506 	}
507 
508 	if (dp->dring_handle != NULL) {
509 		(void) ldc_mem_dring_unmap(dp->dring_handle);
510 	}
511 	kmem_free(dp, sizeof (dring_info_t));
512 	lp->dringp = NULL;
513 }
514 
515 static int
516 vsw_init_multipools(vsw_ldc_t *ldcp, vsw_t *vswp)
517 {
518 	size_t		data_sz;
519 	int		rv;
520 	uint32_t	sz1 = 0;
521 	uint32_t	sz2 = 0;
522 	uint32_t	sz3 = 0;
523 	uint32_t	sz4 = 0;
524 
525 	/*
526 	 * We round up the mtu specified to be a multiple of 2K to limit the
527 	 * number of rx buffer pools created for a given mtu.
528 	 */
529 	data_sz = vswp->max_frame_size + VNET_IPALIGN + VNET_LDCALIGN;
530 	data_sz = VNET_ROUNDUP_2K(data_sz);
531 
532 	/*
533 	 * If pool sizes are specified, use them. Note that the presence of
534 	 * the first tunable will be used as a hint.
535 	 */
536 	if (vsw_mblk_size1 != 0) {
537 		sz1 = vsw_mblk_size1;
538 		sz2 = vsw_mblk_size2;
539 		sz3 = vsw_mblk_size3;
540 		sz4 = vsw_mblk_size4;
541 
542 		if (sz4 == 0) { /* need 3 pools */
543 
544 			ldcp->max_rxpool_size = sz3;
545 			rv = vio_init_multipools(&ldcp->vmp,
546 			    VSW_NUM_VMPOOLS, sz1, sz2, sz3,
547 			    vsw_num_mblks1, vsw_num_mblks2, vsw_num_mblks3);
548 
549 		} else {
550 
551 			ldcp->max_rxpool_size = sz4;
552 			rv = vio_init_multipools(&ldcp->vmp,
553 			    VSW_NUM_VMPOOLS + 1, sz1, sz2, sz3, sz4,
554 			    vsw_num_mblks1, vsw_num_mblks2, vsw_num_mblks3,
555 			    vsw_num_mblks4);
556 
557 		}
558 
559 		return (rv);
560 	}
561 
562 	/*
563 	 * Pool sizes are not specified. We select the pool sizes based on the
564 	 * mtu if vnet_jumbo_rxpools is enabled.
565 	 */
566 	if (vsw_jumbo_rxpools == B_FALSE || data_sz == VNET_2K) {
567 		/*
568 		 * Receive buffer pool allocation based on mtu is disabled.
569 		 * Use the default mechanism of standard size pool allocation.
570 		 */
571 		sz1 = VSW_MBLK_SZ_128;
572 		sz2 = VSW_MBLK_SZ_256;
573 		sz3 = VSW_MBLK_SZ_2048;
574 		ldcp->max_rxpool_size = sz3;
575 
576 		rv = vio_init_multipools(&ldcp->vmp, VSW_NUM_VMPOOLS,
577 		    sz1, sz2, sz3,
578 		    vsw_num_mblks1, vsw_num_mblks2, vsw_num_mblks3);
579 
580 		return (rv);
581 	}
582 
583 	switch (data_sz) {
584 
585 	case VNET_4K:
586 
587 		sz1 = VSW_MBLK_SZ_128;
588 		sz2 = VSW_MBLK_SZ_256;
589 		sz3 = VSW_MBLK_SZ_2048;
590 		sz4 = sz3 << 1;			/* 4K */
591 		ldcp->max_rxpool_size = sz4;
592 
593 		rv = vio_init_multipools(&ldcp->vmp, VSW_NUM_VMPOOLS + 1,
594 		    sz1, sz2, sz3, sz4,
595 		    vsw_num_mblks1, vsw_num_mblks2, vsw_num_mblks3,
596 		    vsw_num_mblks4);
597 		break;
598 
599 	default:	/* data_sz:  4K+ to 16K */
600 
601 		sz1 = VSW_MBLK_SZ_256;
602 		sz2 = VSW_MBLK_SZ_2048;
603 		sz3 = data_sz >> 1;	/* Jumbo-size/2 */
604 		sz4 = data_sz;	/* Jumbo-size */
605 		ldcp->max_rxpool_size = sz4;
606 
607 		rv = vio_init_multipools(&ldcp->vmp, VSW_NUM_VMPOOLS + 1,
608 		    sz1, sz2, sz3, sz4,
609 		    vsw_num_mblks1, vsw_num_mblks2, vsw_num_mblks3,
610 		    vsw_num_mblks4);
611 		break;
612 	}
613 
614 	return (rv);
615 
616 }
617 
618 /*
619  * Generic routine to send message out over ldc channel.
620  *
621  * It is possible that when we attempt to write over the ldc channel
622  * that we get notified that it has been reset. Depending on the value
623  * of the handle_reset flag we either handle that event here or simply
624  * notify the caller that the channel was reset.
625  */
626 int
627 vsw_send_msg(vsw_ldc_t *ldcp, void *msgp, int size, boolean_t handle_reset)
628 {
629 	int			rv;
630 	size_t			msglen = size;
631 	vio_msg_tag_t		*tag = (vio_msg_tag_t *)msgp;
632 	vsw_t			*vswp = ldcp->ldc_vswp;
633 	vio_dring_msg_t		*dmsg;
634 	vio_raw_data_msg_t	*rmsg;
635 	vnet_ibnd_desc_t	*imsg;
636 	boolean_t		data_msg = B_FALSE;
637 	int			retries = vsw_wretries;
638 
639 	D1(vswp, "vsw_send_msg (%lld) enter : sending %d bytes",
640 	    ldcp->ldc_id, size);
641 
642 	D2(vswp, "send_msg: type 0x%llx", tag->vio_msgtype);
643 	D2(vswp, "send_msg: stype 0x%llx", tag->vio_subtype);
644 	D2(vswp, "send_msg: senv 0x%llx", tag->vio_subtype_env);
645 
646 	mutex_enter(&ldcp->ldc_txlock);
647 
648 	if (tag->vio_subtype == VIO_SUBTYPE_INFO) {
649 		if (tag->vio_subtype_env == VIO_DRING_DATA) {
650 			dmsg = (vio_dring_msg_t *)tag;
651 			dmsg->seq_num = ldcp->lane_out.seq_num;
652 			data_msg = B_TRUE;
653 		} else if (tag->vio_subtype_env == VIO_PKT_DATA) {
654 			rmsg = (vio_raw_data_msg_t *)tag;
655 			rmsg->seq_num = ldcp->lane_out.seq_num;
656 			data_msg = B_TRUE;
657 		} else if (tag->vio_subtype_env == VIO_DESC_DATA) {
658 			imsg = (vnet_ibnd_desc_t *)tag;
659 			imsg->hdr.seq_num = ldcp->lane_out.seq_num;
660 			data_msg = B_TRUE;
661 		}
662 	}
663 
664 	do {
665 		msglen = size;
666 		rv = ldc_write(ldcp->ldc_handle, (caddr_t)msgp, &msglen);
667 	} while (rv == EWOULDBLOCK && --retries > 0);
668 
669 	if (rv == 0 && data_msg == B_TRUE) {
670 		ldcp->lane_out.seq_num++;
671 	}
672 
673 	if ((rv != 0) || (msglen != size)) {
674 		DERR(vswp, "vsw_send_msg:ldc_write failed: chan(%lld) rv(%d) "
675 		    "size (%d) msglen(%d)\n", ldcp->ldc_id, rv, size, msglen);
676 		ldcp->ldc_stats.oerrors++;
677 	}
678 
679 	mutex_exit(&ldcp->ldc_txlock);
680 
681 	/*
682 	 * If channel has been reset we either handle it here or
683 	 * simply report back that it has been reset and let caller
684 	 * decide what to do.
685 	 */
686 	if (rv == ECONNRESET) {
687 		DWARN(vswp, "%s (%lld) channel reset", __func__, ldcp->ldc_id);
688 
689 		if (handle_reset) {
690 			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
691 		}
692 	}
693 
694 	return (rv);
695 }
696 
697 /*
698  * A per LDC worker thread to process ldc messages. This thread is woken up by
699  * the LDC interrupt handler to process LDC packets and receive data.
700  */
701 void
702 vsw_ldc_msg_worker(void *arg)
703 {
704 	callb_cpr_t	cprinfo;
705 	vsw_ldc_t	*ldcp = (vsw_ldc_t *)arg;
706 	vsw_t		*vswp = ldcp->ldc_vswp;
707 
708 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
709 	CALLB_CPR_INIT(&cprinfo, &ldcp->msg_thr_lock, callb_generic_cpr,
710 	    "vsw_msg_thread");
711 	mutex_enter(&ldcp->msg_thr_lock);
712 	while (!(ldcp->msg_thr_flags & VSW_WTHR_STOP)) {
713 
714 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
715 		/*
716 		 * Wait until the data is received or a stop
717 		 * request is received.
718 		 */
719 		while (!(ldcp->msg_thr_flags &
720 		    (VSW_WTHR_DATARCVD | VSW_WTHR_STOP))) {
721 			cv_wait(&ldcp->msg_thr_cv, &ldcp->msg_thr_lock);
722 		}
723 		CALLB_CPR_SAFE_END(&cprinfo, &ldcp->msg_thr_lock)
724 
725 		/*
726 		 * First process the stop request.
727 		 */
728 		if (ldcp->msg_thr_flags & VSW_WTHR_STOP) {
729 			D2(vswp, "%s(%lld):Rx thread stopped\n",
730 			    __func__, ldcp->ldc_id);
731 			break;
732 		}
733 		ldcp->msg_thr_flags &= ~VSW_WTHR_DATARCVD;
734 		mutex_exit(&ldcp->msg_thr_lock);
735 		D1(vswp, "%s(%lld):calling vsw_process_pkt\n",
736 		    __func__, ldcp->ldc_id);
737 		mutex_enter(&ldcp->ldc_cblock);
738 		vsw_process_pkt(ldcp);
739 		mutex_exit(&ldcp->ldc_cblock);
740 		mutex_enter(&ldcp->msg_thr_lock);
741 	}
742 
743 	/*
744 	 * Update the run status and wakeup the thread that
745 	 * has sent the stop request.
746 	 */
747 	ldcp->msg_thr_flags &= ~VSW_WTHR_STOP;
748 	ldcp->msg_thread = NULL;
749 	CALLB_CPR_EXIT(&cprinfo);
750 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
751 	thread_exit();
752 }
753 
754 /* Co-ordinate with msg processing thread to stop it */
755 void
756 vsw_stop_msg_thread(vsw_ldc_t *ldcp)
757 {
758 	kt_did_t	tid = 0;
759 	vsw_t		*vswp = ldcp->ldc_vswp;
760 
761 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
762 	/*
763 	 * Send a stop request by setting the stop flag and
764 	 * wait until the msg process thread stops.
765 	 */
766 	mutex_enter(&ldcp->msg_thr_lock);
767 	if (ldcp->msg_thread != NULL) {
768 		tid = ldcp->msg_thread->t_did;
769 		ldcp->msg_thr_flags |= VSW_WTHR_STOP;
770 		cv_signal(&ldcp->msg_thr_cv);
771 	}
772 	mutex_exit(&ldcp->msg_thr_lock);
773 
774 	if (tid != 0) {
775 		thread_join(tid);
776 	}
777 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
778 }
779 
780 /*
781  * Send packet out via descriptor ring to a logical device.
782  */
783 int
784 vsw_dringsend(vsw_ldc_t *ldcp, mblk_t *mp)
785 {
786 	vio_dring_msg_t		dring_pkt;
787 	dring_info_t		*dp = NULL;
788 	vsw_private_desc_t	*priv_desc = NULL;
789 	vnet_public_desc_t	*pub = NULL;
790 	vsw_t			*vswp = ldcp->ldc_vswp;
791 	mblk_t			*bp;
792 	size_t			n, size;
793 	caddr_t			bufp;
794 	int			idx;
795 	int			status = LDC_TX_SUCCESS;
796 	struct ether_header	*ehp = (struct ether_header *)mp->b_rptr;
797 	lane_t			*lp = &ldcp->lane_out;
798 
799 	D1(vswp, "%s(%lld): enter\n", __func__, ldcp->ldc_id);
800 
801 	/* TODO: make test a macro */
802 	if ((!(ldcp->lane_out.lstate & VSW_LANE_ACTIVE)) ||
803 	    (ldcp->ldc_status != LDC_UP) || (ldcp->ldc_handle == NULL)) {
804 		DWARN(vswp, "%s(%lld) status(%d) lstate(0x%llx), dropping "
805 		    "packet\n", __func__, ldcp->ldc_id, ldcp->ldc_status,
806 		    ldcp->lane_out.lstate);
807 		ldcp->ldc_stats.oerrors++;
808 		return (LDC_TX_FAILURE);
809 	}
810 
811 	if ((dp = ldcp->lane_out.dringp) == NULL) {
812 		DERR(vswp, "%s(%lld): no dring for outbound lane on"
813 		    " channel %d", __func__, ldcp->ldc_id, ldcp->ldc_id);
814 		ldcp->ldc_stats.oerrors++;
815 		return (LDC_TX_FAILURE);
816 	}
817 
818 	size = msgsize(mp);
819 	if (size > (size_t)lp->mtu) {
820 		DERR(vswp, "%s(%lld) invalid size (%ld)\n", __func__,
821 		    ldcp->ldc_id, size);
822 		ldcp->ldc_stats.oerrors++;
823 		return (LDC_TX_FAILURE);
824 	}
825 
826 	/*
827 	 * Find a free descriptor
828 	 *
829 	 * Note: for the moment we are assuming that we will only
830 	 * have one dring going from the switch to each of its
831 	 * peers. This may change in the future.
832 	 */
833 	if (vsw_dring_find_free_desc(dp, &priv_desc, &idx) != 0) {
834 		D2(vswp, "%s(%lld): no descriptor available for ring "
835 		    "at 0x%llx", __func__, ldcp->ldc_id, dp);
836 
837 		/* nothing more we can do */
838 		status = LDC_TX_NORESOURCES;
839 		ldcp->ldc_stats.tx_no_desc++;
840 		goto vsw_dringsend_free_exit;
841 	} else {
842 		D2(vswp, "%s(%lld): free private descriptor found at pos %ld "
843 		    "addr 0x%llx\n", __func__, ldcp->ldc_id, idx, priv_desc);
844 	}
845 
846 	/* copy data into the descriptor */
847 	bufp = priv_desc->datap;
848 	bufp += VNET_IPALIGN;
849 	for (bp = mp, n = 0; bp != NULL; bp = bp->b_cont) {
850 		n = MBLKL(bp);
851 		bcopy(bp->b_rptr, bufp, n);
852 		bufp += n;
853 	}
854 
855 	priv_desc->datalen = (size < (size_t)ETHERMIN) ? ETHERMIN : size;
856 
857 	pub = priv_desc->descp;
858 	pub->nbytes = priv_desc->datalen;
859 
860 	/* update statistics */
861 	if (IS_BROADCAST(ehp))
862 		ldcp->ldc_stats.brdcstxmt++;
863 	else if (IS_MULTICAST(ehp))
864 		ldcp->ldc_stats.multixmt++;
865 	ldcp->ldc_stats.opackets++;
866 	ldcp->ldc_stats.obytes += priv_desc->datalen;
867 
868 	mutex_enter(&priv_desc->dstate_lock);
869 	pub->hdr.dstate = VIO_DESC_READY;
870 	mutex_exit(&priv_desc->dstate_lock);
871 
872 	/*
873 	 * Determine whether or not we need to send a message to our
874 	 * peer prompting them to read our newly updated descriptor(s).
875 	 */
876 	mutex_enter(&dp->restart_lock);
877 	if (dp->restart_reqd) {
878 		dp->restart_reqd = B_FALSE;
879 		ldcp->ldc_stats.dring_data_msgs_sent++;
880 		mutex_exit(&dp->restart_lock);
881 
882 		/*
883 		 * Send a vio_dring_msg to peer to prompt them to read
884 		 * the updated descriptor ring.
885 		 */
886 		dring_pkt.tag.vio_msgtype = VIO_TYPE_DATA;
887 		dring_pkt.tag.vio_subtype = VIO_SUBTYPE_INFO;
888 		dring_pkt.tag.vio_subtype_env = VIO_DRING_DATA;
889 		dring_pkt.tag.vio_sid = ldcp->local_session;
890 
891 		/* Note - for now using first ring */
892 		dring_pkt.dring_ident = dp->ident;
893 
894 		/*
895 		 * If last_ack_recv is -1 then we know we've not
896 		 * received any ack's yet, so this must be the first
897 		 * msg sent, so set the start to the begining of the ring.
898 		 */
899 		mutex_enter(&dp->dlock);
900 		if (dp->last_ack_recv == -1) {
901 			dring_pkt.start_idx = 0;
902 		} else {
903 			dring_pkt.start_idx =
904 			    (dp->last_ack_recv + 1) % dp->num_descriptors;
905 		}
906 		dring_pkt.end_idx = -1;
907 		mutex_exit(&dp->dlock);
908 
909 		D3(vswp, "%s(%lld): dring 0x%llx : ident 0x%llx\n", __func__,
910 		    ldcp->ldc_id, dp, dring_pkt.dring_ident);
911 		D3(vswp, "%s(%lld): start %lld : end %lld :\n",
912 		    __func__, ldcp->ldc_id, dring_pkt.start_idx,
913 		    dring_pkt.end_idx);
914 
915 		(void) vsw_send_msg(ldcp, (void *)&dring_pkt,
916 		    sizeof (vio_dring_msg_t), B_TRUE);
917 
918 		return (status);
919 
920 	} else {
921 		mutex_exit(&dp->restart_lock);
922 		D2(vswp, "%s(%lld): updating descp %d", __func__,
923 		    ldcp->ldc_id, idx);
924 	}
925 
926 vsw_dringsend_free_exit:
927 
928 	D1(vswp, "%s(%lld): exit\n", __func__, ldcp->ldc_id);
929 	return (status);
930 }
931 
932 /*
933  * Searches the private section of a ring for a free descriptor,
934  * starting at the location of the last free descriptor found
935  * previously.
936  *
937  * Returns 0 if free descriptor is available, and updates state
938  * of private descriptor to VIO_DESC_READY,  otherwise returns 1.
939  *
940  * FUTURE: might need to return contiguous range of descriptors
941  * as dring info msg assumes all will be contiguous.
942  */
943 int
944 vsw_dring_find_free_desc(dring_info_t *dringp,
945 		vsw_private_desc_t **priv_p, int *idx)
946 {
947 	vsw_private_desc_t	*addr = NULL;
948 	int			num = vsw_num_descriptors;
949 	int			ret = 1;
950 
951 	D1(NULL, "%s enter\n", __func__);
952 
953 	ASSERT(dringp->priv_addr != NULL);
954 
955 	D2(NULL, "%s: searching ring, dringp 0x%llx : start pos %lld",
956 	    __func__, dringp, dringp->end_idx);
957 
958 	addr = (vsw_private_desc_t *)dringp->priv_addr + dringp->end_idx;
959 
960 	mutex_enter(&addr->dstate_lock);
961 	if (addr->dstate == VIO_DESC_FREE) {
962 		addr->dstate = VIO_DESC_READY;
963 		*priv_p = addr;
964 		*idx = dringp->end_idx;
965 		dringp->end_idx = (dringp->end_idx + 1) % num;
966 		ret = 0;
967 
968 	}
969 	mutex_exit(&addr->dstate_lock);
970 
971 	/* ring full */
972 	if (ret == 1) {
973 		D2(NULL, "%s: no desp free: started at %d", __func__,
974 		    dringp->end_idx);
975 	}
976 
977 	D1(NULL, "%s: exit\n", __func__);
978 
979 	return (ret);
980 }
981 
982 /* vsw_reclaim_dring -- reclaim descriptors */
983 int
984 vsw_reclaim_dring(dring_info_t *dp, int start)
985 {
986 	int i, j, len;
987 	vsw_private_desc_t *priv_addr;
988 	vnet_public_desc_t *pub_addr;
989 
990 	pub_addr = (vnet_public_desc_t *)dp->pub_addr;
991 	priv_addr = (vsw_private_desc_t *)dp->priv_addr;
992 	len = dp->num_descriptors;
993 
994 	D2(NULL, "%s: start index %ld\n", __func__, start);
995 
996 	j = 0;
997 	for (i = start; j < len; i = (i + 1) % len, j++) {
998 		pub_addr = (vnet_public_desc_t *)dp->pub_addr + i;
999 		priv_addr = (vsw_private_desc_t *)dp->priv_addr + i;
1000 
1001 		mutex_enter(&priv_addr->dstate_lock);
1002 		if (pub_addr->hdr.dstate != VIO_DESC_DONE) {
1003 			mutex_exit(&priv_addr->dstate_lock);
1004 			break;
1005 		}
1006 		pub_addr->hdr.dstate = VIO_DESC_FREE;
1007 		priv_addr->dstate = VIO_DESC_FREE;
1008 		/* clear all the fields */
1009 		priv_addr->datalen = 0;
1010 		pub_addr->hdr.ack = 0;
1011 		mutex_exit(&priv_addr->dstate_lock);
1012 
1013 		D3(NULL, "claiming descp:%d pub state:0x%llx priv state 0x%llx",
1014 		    i, pub_addr->hdr.dstate, priv_addr->dstate);
1015 	}
1016 	return (j);
1017 }
1018 
1019 void
1020 vsw_process_dringdata(void *arg, void *dpkt)
1021 {
1022 	vsw_ldc_t		*ldcp = arg;
1023 	vio_dring_msg_t		*dring_pkt;
1024 	vnet_public_desc_t	desc, *pub_addr = NULL;
1025 	vsw_private_desc_t	*priv_addr = NULL;
1026 	dring_info_t		*dp = NULL;
1027 	vsw_t			*vswp = ldcp->ldc_vswp;
1028 	mblk_t			*mp = NULL;
1029 	vio_mblk_t		*vmp = NULL;
1030 	mblk_t			*bp = NULL;
1031 	mblk_t			*bpt = NULL;
1032 	size_t			nbytes = 0;
1033 	uint64_t		chain = 0;
1034 	uint64_t		len;
1035 	uint32_t		pos, start;
1036 	uint32_t		range_start, range_end;
1037 	int32_t			end, num, cnt = 0;
1038 	int			i, rv, rng_rv = 0, msg_rv = 0;
1039 	boolean_t		prev_desc_ack = B_FALSE;
1040 	int			read_attempts = 0;
1041 	struct ether_header	*ehp;
1042 	lane_t			*lp = &ldcp->lane_out;
1043 
1044 	D1(vswp, "%s(%lld): enter", __func__, ldcp->ldc_id);
1045 
1046 	/*
1047 	 * We know this is a data/dring packet so
1048 	 * cast it into the correct structure.
1049 	 */
1050 	dring_pkt = (vio_dring_msg_t *)dpkt;
1051 
1052 	/*
1053 	 * Switch on the vio_subtype. If its INFO then we need to
1054 	 * process the data. If its an ACK we need to make sure
1055 	 * it makes sense (i.e did we send an earlier data/info),
1056 	 * and if its a NACK then we maybe attempt a retry.
1057 	 */
1058 	switch (dring_pkt->tag.vio_subtype) {
1059 	case VIO_SUBTYPE_INFO:
1060 		D2(vswp, "%s(%lld): VIO_SUBTYPE_INFO", __func__, ldcp->ldc_id);
1061 
1062 		dp = ldcp->lane_in.dringp;
1063 		if (dp->ident != dring_pkt->dring_ident) {
1064 			DERR(vswp, "%s(%lld): unable to find dring from "
1065 			    "ident 0x%llx", __func__, ldcp->ldc_id,
1066 			    dring_pkt->dring_ident);
1067 
1068 			SND_DRING_NACK(ldcp, dring_pkt);
1069 			return;
1070 		}
1071 
1072 		ldcp->ldc_stats.dring_data_msgs_rcvd++;
1073 
1074 		start = pos = dring_pkt->start_idx;
1075 		end = dring_pkt->end_idx;
1076 		len = dp->num_descriptors;
1077 
1078 		range_start = range_end = pos;
1079 
1080 		D2(vswp, "%s(%lld): start index %ld : end %ld\n",
1081 		    __func__, ldcp->ldc_id, start, end);
1082 
1083 		if (end == -1) {
1084 			num = -1;
1085 		} else if (end >= 0) {
1086 			num = end >= pos ? end - pos + 1: (len - pos + 1) + end;
1087 
1088 			/* basic sanity check */
1089 			if (end > len) {
1090 				DERR(vswp, "%s(%lld): endpoint %lld outside "
1091 				    "ring length %lld", __func__,
1092 				    ldcp->ldc_id, end, len);
1093 
1094 				SND_DRING_NACK(ldcp, dring_pkt);
1095 				return;
1096 			}
1097 		} else {
1098 			DERR(vswp, "%s(%lld): invalid endpoint %lld",
1099 			    __func__, ldcp->ldc_id, end);
1100 			SND_DRING_NACK(ldcp, dring_pkt);
1101 			return;
1102 		}
1103 
1104 		while (cnt != num) {
1105 vsw_recheck_desc:
1106 			pub_addr = (vnet_public_desc_t *)dp->pub_addr + pos;
1107 
1108 			if ((rng_rv = vnet_dring_entry_copy(pub_addr,
1109 			    &desc, dp->dring_mtype, dp->dring_handle,
1110 			    pos, pos)) != 0) {
1111 				DERR(vswp, "%s(%lld): unable to copy "
1112 				    "descriptor at pos %d: err %d",
1113 				    __func__, pos, ldcp->ldc_id, rng_rv);
1114 				ldcp->ldc_stats.ierrors++;
1115 				break;
1116 			}
1117 
1118 			/*
1119 			 * When given a bounded range of descriptors
1120 			 * to process, its an error to hit a descriptor
1121 			 * which is not ready. In the non-bounded case
1122 			 * (end_idx == -1) this simply indicates we have
1123 			 * reached the end of the current active range.
1124 			 */
1125 			if (desc.hdr.dstate != VIO_DESC_READY) {
1126 				/* unbound - no error */
1127 				if (end == -1) {
1128 					if (read_attempts == vsw_recv_retries)
1129 						break;
1130 
1131 					delay(drv_usectohz(vsw_recv_delay));
1132 					read_attempts++;
1133 					goto vsw_recheck_desc;
1134 				}
1135 
1136 				/* bounded - error - so NACK back */
1137 				DERR(vswp, "%s(%lld): descriptor not READY "
1138 				    "(%d)", __func__, ldcp->ldc_id,
1139 				    desc.hdr.dstate);
1140 				SND_DRING_NACK(ldcp, dring_pkt);
1141 				return;
1142 			}
1143 
1144 			DTRACE_PROBE1(read_attempts, int, read_attempts);
1145 
1146 			range_end = pos;
1147 
1148 			/*
1149 			 * If we ACK'd the previous descriptor then now
1150 			 * record the new range start position for later
1151 			 * ACK's.
1152 			 */
1153 			if (prev_desc_ack) {
1154 				range_start = pos;
1155 
1156 				D2(vswp, "%s(%lld): updating range start to be "
1157 				    "%d", __func__, ldcp->ldc_id, range_start);
1158 
1159 				prev_desc_ack = B_FALSE;
1160 			}
1161 
1162 			D2(vswp, "%s(%lld): processing desc %lld at pos"
1163 			    " 0x%llx : dstate 0x%lx : datalen 0x%lx",
1164 			    __func__, ldcp->ldc_id, pos, &desc,
1165 			    desc.hdr.dstate, desc.nbytes);
1166 
1167 			if ((desc.nbytes < ETHERMIN) ||
1168 			    (desc.nbytes > lp->mtu)) {
1169 				/* invalid size; drop the packet */
1170 				ldcp->ldc_stats.ierrors++;
1171 				goto vsw_process_desc_done;
1172 			}
1173 
1174 			/*
1175 			 * Ensure that we ask ldc for an aligned
1176 			 * number of bytes. Data is padded to align on 8
1177 			 * byte boundary, desc.nbytes is actual data length,
1178 			 * i.e. minus that padding.
1179 			 */
1180 			nbytes = (desc.nbytes + VNET_IPALIGN + 7) & ~7;
1181 			if (nbytes > ldcp->max_rxpool_size) {
1182 				mp = allocb(desc.nbytes + VNET_IPALIGN + 8,
1183 				    BPRI_MED);
1184 				vmp = NULL;
1185 			} else {
1186 				vmp = vio_multipool_allocb(&ldcp->vmp, nbytes);
1187 				if (vmp == NULL) {
1188 					ldcp->ldc_stats.rx_vio_allocb_fail++;
1189 					/*
1190 					 * No free receive buffers available,
1191 					 * so fallback onto allocb(9F). Make
1192 					 * sure that we get a data buffer which
1193 					 * is a multiple of 8 as this is
1194 					 * required by ldc_mem_copy.
1195 					 */
1196 					DTRACE_PROBE(allocb);
1197 					mp = allocb(desc.nbytes +
1198 					    VNET_IPALIGN + 8, BPRI_MED);
1199 				} else {
1200 					mp = vmp->mp;
1201 				}
1202 			}
1203 			if (mp == NULL) {
1204 				DERR(vswp, "%s(%ld): allocb failed",
1205 				    __func__, ldcp->ldc_id);
1206 				rng_rv = vnet_dring_entry_set_dstate(pub_addr,
1207 				    dp->dring_mtype, dp->dring_handle, pos, pos,
1208 				    VIO_DESC_DONE);
1209 				ldcp->ldc_stats.ierrors++;
1210 				ldcp->ldc_stats.rx_allocb_fail++;
1211 				break;
1212 			}
1213 
1214 			rv = ldc_mem_copy(ldcp->ldc_handle,
1215 			    (caddr_t)mp->b_rptr, 0, &nbytes,
1216 			    desc.memcookie, desc.ncookies, LDC_COPY_IN);
1217 			if (rv != 0) {
1218 				DERR(vswp, "%s(%d): unable to copy in data "
1219 				    "from %d cookies in desc %d (rv %d)",
1220 				    __func__, ldcp->ldc_id, desc.ncookies,
1221 				    pos, rv);
1222 				freemsg(mp);
1223 
1224 				rng_rv = vnet_dring_entry_set_dstate(pub_addr,
1225 				    dp->dring_mtype, dp->dring_handle, pos, pos,
1226 				    VIO_DESC_DONE);
1227 				ldcp->ldc_stats.ierrors++;
1228 				break;
1229 			} else {
1230 				D2(vswp, "%s(%d): copied in %ld bytes"
1231 				    " using %d cookies", __func__,
1232 				    ldcp->ldc_id, nbytes, desc.ncookies);
1233 			}
1234 
1235 			/* adjust the read pointer to skip over the padding */
1236 			mp->b_rptr += VNET_IPALIGN;
1237 
1238 			/* point to the actual end of data */
1239 			mp->b_wptr = mp->b_rptr + desc.nbytes;
1240 
1241 			if (vmp != NULL) {
1242 				vmp->state = VIO_MBLK_HAS_DATA;
1243 			}
1244 
1245 			/* update statistics */
1246 			ehp = (struct ether_header *)mp->b_rptr;
1247 			if (IS_BROADCAST(ehp))
1248 				ldcp->ldc_stats.brdcstrcv++;
1249 			else if (IS_MULTICAST(ehp))
1250 				ldcp->ldc_stats.multircv++;
1251 
1252 			ldcp->ldc_stats.ipackets++;
1253 			ldcp->ldc_stats.rbytes += desc.nbytes;
1254 
1255 			/*
1256 			 * IPALIGN space can be used for VLAN_TAG
1257 			 */
1258 			(void) vsw_vlan_frame_pretag(ldcp->ldc_port,
1259 			    VSW_VNETPORT, mp);
1260 
1261 			/* build a chain of received packets */
1262 			if (bp == NULL) {
1263 				/* first pkt */
1264 				bp = mp;
1265 				bp->b_next = bp->b_prev = NULL;
1266 				bpt = bp;
1267 				chain = 1;
1268 			} else {
1269 				mp->b_next = mp->b_prev = NULL;
1270 				bpt->b_next = mp;
1271 				bpt = mp;
1272 				chain++;
1273 			}
1274 
1275 vsw_process_desc_done:
1276 			/* mark we are finished with this descriptor */
1277 			if ((rng_rv = vnet_dring_entry_set_dstate(pub_addr,
1278 			    dp->dring_mtype, dp->dring_handle, pos, pos,
1279 			    VIO_DESC_DONE)) != 0) {
1280 				DERR(vswp, "%s(%lld): unable to update "
1281 				    "dstate at pos %d: err %d",
1282 				    __func__, pos, ldcp->ldc_id, rng_rv);
1283 				ldcp->ldc_stats.ierrors++;
1284 				break;
1285 			}
1286 
1287 			/*
1288 			 * Send an ACK back to peer if requested.
1289 			 */
1290 			if (desc.hdr.ack) {
1291 				dring_pkt->start_idx = range_start;
1292 				dring_pkt->end_idx = range_end;
1293 
1294 				DERR(vswp, "%s(%lld): processed %d %d, ACK"
1295 				    " requested", __func__, ldcp->ldc_id,
1296 				    dring_pkt->start_idx, dring_pkt->end_idx);
1297 
1298 				dring_pkt->dring_process_state = VIO_DP_ACTIVE;
1299 				dring_pkt->tag.vio_subtype = VIO_SUBTYPE_ACK;
1300 				dring_pkt->tag.vio_sid = ldcp->local_session;
1301 
1302 				msg_rv = vsw_send_msg(ldcp, (void *)dring_pkt,
1303 				    sizeof (vio_dring_msg_t), B_FALSE);
1304 
1305 				/*
1306 				 * Check if ACK was successfully sent. If not
1307 				 * we break and deal with that below.
1308 				 */
1309 				if (msg_rv != 0)
1310 					break;
1311 
1312 				prev_desc_ack = B_TRUE;
1313 				range_start = pos;
1314 			}
1315 
1316 			/* next descriptor */
1317 			pos = (pos + 1) % len;
1318 			cnt++;
1319 
1320 			/*
1321 			 * Break out of loop here and stop processing to
1322 			 * allow some other network device (or disk) to
1323 			 * get access to the cpu.
1324 			 */
1325 			if (chain > vsw_chain_len) {
1326 				D3(vswp, "%s(%lld): switching chain of %d "
1327 				    "msgs", __func__, ldcp->ldc_id, chain);
1328 				break;
1329 			}
1330 		}
1331 
1332 		/* send the chain of packets to be switched */
1333 		if (bp != NULL) {
1334 			DTRACE_PROBE1(vsw_rcv_msgs, int, chain);
1335 			D3(vswp, "%s(%lld): switching chain of %d msgs",
1336 			    __func__, ldcp->ldc_id, chain);
1337 			vswp->vsw_switch_frame(vswp, bp, VSW_VNETPORT,
1338 			    ldcp->ldc_port, NULL);
1339 		}
1340 
1341 		/*
1342 		 * If when we encountered an error when attempting to
1343 		 * access an imported dring, initiate a connection reset.
1344 		 */
1345 		if (rng_rv != 0) {
1346 			vsw_process_conn_evt(ldcp, VSW_CONN_RESTART);
1347 			break;
1348 		}
1349 
1350 		/*
1351 		 * If when we attempted to send the ACK we found that the
1352 		 * channel had been reset then now handle this.
1353 		 */
1354 		if (msg_rv == ECONNRESET) {
1355 			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1356 			break;
1357 		}
1358 
1359 		DTRACE_PROBE1(msg_cnt, int, cnt);
1360 
1361 		/*
1362 		 * We are now finished so ACK back with the state
1363 		 * set to STOPPING so our peer knows we are finished
1364 		 */
1365 		dring_pkt->tag.vio_subtype = VIO_SUBTYPE_ACK;
1366 		dring_pkt->tag.vio_sid = ldcp->local_session;
1367 
1368 		dring_pkt->dring_process_state = VIO_DP_STOPPED;
1369 
1370 		DTRACE_PROBE(stop_process_sent);
1371 
1372 		/*
1373 		 * We have not processed any more descriptors beyond
1374 		 * the last one we ACK'd.
1375 		 */
1376 		if (prev_desc_ack)
1377 			range_start = range_end;
1378 
1379 		dring_pkt->start_idx = range_start;
1380 		dring_pkt->end_idx = range_end;
1381 
1382 		D2(vswp, "%s(%lld) processed : %d : %d, now stopping",
1383 		    __func__, ldcp->ldc_id, dring_pkt->start_idx,
1384 		    dring_pkt->end_idx);
1385 
1386 		(void) vsw_send_msg(ldcp, (void *)dring_pkt,
1387 		    sizeof (vio_dring_msg_t), B_TRUE);
1388 		ldcp->ldc_stats.dring_data_acks_sent++;
1389 		ldcp->ldc_stats.dring_stopped_acks_sent++;
1390 		break;
1391 
1392 	case VIO_SUBTYPE_ACK:
1393 		D2(vswp, "%s(%lld): VIO_SUBTYPE_ACK", __func__, ldcp->ldc_id);
1394 		/*
1395 		 * Verify that the relevant descriptors are all
1396 		 * marked as DONE
1397 		 */
1398 		dp = ldcp->lane_out.dringp;
1399 		if (dp->ident != dring_pkt->dring_ident) {
1400 			DERR(vswp, "%s: unknown ident in ACK", __func__);
1401 			return;
1402 		}
1403 
1404 		start = end = 0;
1405 		start = dring_pkt->start_idx;
1406 		end = dring_pkt->end_idx;
1407 		len = dp->num_descriptors;
1408 
1409 
1410 		mutex_enter(&dp->dlock);
1411 		dp->last_ack_recv = end;
1412 		ldcp->ldc_stats.dring_data_acks_rcvd++;
1413 		mutex_exit(&dp->dlock);
1414 
1415 		(void) vsw_reclaim_dring(dp, start);
1416 
1417 		/*
1418 		 * If our peer is stopping processing descriptors then
1419 		 * we check to make sure it has processed all the descriptors
1420 		 * we have updated. If not then we send it a new message
1421 		 * to prompt it to restart.
1422 		 */
1423 		if (dring_pkt->dring_process_state == VIO_DP_STOPPED) {
1424 			DTRACE_PROBE(stop_process_recv);
1425 			D2(vswp, "%s(%lld): got stopping msg : %d : %d",
1426 			    __func__, ldcp->ldc_id, dring_pkt->start_idx,
1427 			    dring_pkt->end_idx);
1428 
1429 			/*
1430 			 * Check next descriptor in public section of ring.
1431 			 * If its marked as READY then we need to prompt our
1432 			 * peer to start processing the ring again.
1433 			 */
1434 			i = (end + 1) % len;
1435 			pub_addr = (vnet_public_desc_t *)dp->pub_addr + i;
1436 			priv_addr = (vsw_private_desc_t *)dp->priv_addr + i;
1437 
1438 			/*
1439 			 * Hold the restart lock across all of this to
1440 			 * make sure that its not possible for us to
1441 			 * decide that a msg needs to be sent in the future
1442 			 * but the sending code having already checked is
1443 			 * about to exit.
1444 			 */
1445 			mutex_enter(&dp->restart_lock);
1446 			ldcp->ldc_stats.dring_stopped_acks_rcvd++;
1447 			mutex_enter(&priv_addr->dstate_lock);
1448 			if (pub_addr->hdr.dstate == VIO_DESC_READY) {
1449 
1450 				mutex_exit(&priv_addr->dstate_lock);
1451 
1452 				dring_pkt->tag.vio_subtype = VIO_SUBTYPE_INFO;
1453 				dring_pkt->tag.vio_sid = ldcp->local_session;
1454 
1455 				dring_pkt->start_idx = (end + 1) % len;
1456 				dring_pkt->end_idx = -1;
1457 
1458 				D2(vswp, "%s(%lld) : sending restart msg:"
1459 				    " %d : %d", __func__, ldcp->ldc_id,
1460 				    dring_pkt->start_idx, dring_pkt->end_idx);
1461 
1462 				msg_rv = vsw_send_msg(ldcp, (void *)dring_pkt,
1463 				    sizeof (vio_dring_msg_t), B_FALSE);
1464 				ldcp->ldc_stats.dring_data_msgs_sent++;
1465 
1466 			} else {
1467 				mutex_exit(&priv_addr->dstate_lock);
1468 				dp->restart_reqd = B_TRUE;
1469 			}
1470 			mutex_exit(&dp->restart_lock);
1471 		}
1472 
1473 		if (msg_rv == ECONNRESET)
1474 			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1475 
1476 		break;
1477 
1478 	case VIO_SUBTYPE_NACK:
1479 		DWARN(vswp, "%s(%lld): VIO_SUBTYPE_NACK",
1480 		    __func__, ldcp->ldc_id);
1481 		/*
1482 		 * Something is badly wrong if we are getting NACK's
1483 		 * for our data pkts. So reset the channel.
1484 		 */
1485 		vsw_process_conn_evt(ldcp, VSW_CONN_RESTART);
1486 
1487 		break;
1488 
1489 	default:
1490 		DERR(vswp, "%s(%lld): Unknown vio_subtype %x\n", __func__,
1491 		    ldcp->ldc_id, dring_pkt->tag.vio_subtype);
1492 	}
1493 
1494 	D1(vswp, "%s(%lld) exit", __func__, ldcp->ldc_id);
1495 }
1496