xref: /titanic_41/usr/src/uts/sun4v/io/vsw_rxdring.c (revision 6cbe5a00dfdddadcf091446d70b25dea5855db33)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 #include <sys/types.h>
27 #include <sys/errno.h>
28 #include <sys/sysmacros.h>
29 #include <sys/param.h>
30 #include <sys/machsystm.h>
31 #include <sys/stream.h>
32 #include <sys/strsubr.h>
33 #include <sys/kmem.h>
34 #include <sys/strsun.h>
35 #include <sys/callb.h>
36 #include <sys/sdt.h>
37 #include <sys/mach_descrip.h>
38 #include <sys/mdeg.h>
39 #include <net/if.h>
40 #include <sys/vsw.h>
41 #include <sys/vio_mailbox.h>
42 #include <sys/vio_common.h>
43 #include <sys/vnet_common.h>
44 #include <sys/vnet_mailbox.h>
45 #include <sys/vio_util.h>
46 
47 /*
48  * This file contains the implementation of RxDringData transfer mode of VIO
49  * Protocol in vsw. The functions in this file are invoked from vsw_ldc.c
50  * after RxDringData mode is negotiated with the peer during attribute phase of
51  * handshake. This file contains functions that setup the transmit and receive
52  * descriptor rings, and associated resources in RxDringData mode. It also
53  * contains the transmit and receive data processing functions that are invoked
54  * in RxDringData mode. The data processing routines in this file have the
55  * suffix '_shm' to indicate the shared memory mechanism used in RxDringData
56  * mode.
57  */
58 
59 /* Functions exported to vsw_ldc.c */
60 vio_dring_reg_msg_t *vsw_create_rx_dring_info(vsw_ldc_t *);
61 void vsw_destroy_rx_dring(vsw_ldc_t *ldcp);
62 dring_info_t *vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt);
63 void vsw_unmap_tx_dring(vsw_ldc_t *ldcp);
64 int vsw_dringsend_shm(vsw_ldc_t *, mblk_t *);
65 void vsw_ldc_rcv_worker(void *arg);
66 void vsw_stop_rcv_thread(vsw_ldc_t *ldcp);
67 void vsw_process_dringdata_shm(void *, void *);
68 
69 /* Internal functions */
70 static dring_info_t *vsw_create_rx_dring(vsw_ldc_t *);
71 static int vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp);
72 static void vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp,
73 	vio_dring_msg_t *msg);
74 static void vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp,
75 	vio_dring_msg_t *msg);
76 static void vsw_ldc_rcv_shm(vsw_ldc_t *ldcp);
77 static int vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp);
78 static int vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size,
79     boolean_t handle_reset);
80 
81 /* Functions imported from vsw_ldc.c */
82 extern void vsw_process_pkt(void *);
83 extern void vsw_destroy_rxpools(void *);
84 extern dring_info_t *vsw_map_dring_cmn(vsw_ldc_t *ldcp,
85     vio_dring_reg_msg_t *dring_pkt);
86 extern void vsw_process_conn_evt(vsw_ldc_t *, uint16_t);
87 extern mblk_t *vsw_vlan_frame_pretag(void *arg, int type, mblk_t *mp);
88 
89 /* Tunables */
90 extern int vsw_wretries;
91 extern int vsw_recv_delay;
92 extern int vsw_recv_retries;
93 extern uint32_t vsw_chain_len;
94 extern uint32_t vsw_num_descriptors;
95 extern uint32_t vsw_nrbufs_factor;
96 
97 #define	VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count, total_count)	\
98 {									\
99 	DTRACE_PROBE2(vsw_rx_pkts, vsw_ldc_t *, (ldcp), int, (count));	\
100 	(vswp)->vsw_switch_frame((vswp), (bp), VSW_VNETPORT,		\
101 	    (ldcp)->ldc_port, NULL);					\
102 	(bp) = (bpt) = NULL;						\
103 	(count) = 0;							\
104 }
105 
106 vio_dring_reg_msg_t *
107 vsw_create_rx_dring_info(vsw_ldc_t *ldcp)
108 {
109 	vio_dring_reg_msg_t	*mp;
110 	vio_dring_reg_ext_msg_t	*emsg;
111 	dring_info_t		*dp;
112 	uint8_t			*buf;
113 	vsw_t			*vswp = ldcp->ldc_vswp;
114 
115 	D1(vswp, "%s enter\n", __func__);
116 
117 	/*
118 	 * If we can't create a dring, obviously no point sending
119 	 * a message.
120 	 */
121 	if ((dp = vsw_create_rx_dring(ldcp)) == NULL)
122 		return (NULL);
123 
124 	mp = kmem_zalloc(VNET_DRING_REG_EXT_MSG_SIZE(dp->data_ncookies),
125 	    KM_SLEEP);
126 
127 	mp->tag.vio_msgtype = VIO_TYPE_CTRL;
128 	mp->tag.vio_subtype = VIO_SUBTYPE_INFO;
129 	mp->tag.vio_subtype_env = VIO_DRING_REG;
130 	mp->tag.vio_sid = ldcp->local_session;
131 
132 	/* payload */
133 	mp->num_descriptors = dp->num_descriptors;
134 	mp->descriptor_size = dp->descriptor_size;
135 	mp->options = dp->options;
136 	mp->ncookies = dp->dring_ncookies;
137 	bcopy(&dp->dring_cookie[0], &mp->cookie[0],
138 	    sizeof (ldc_mem_cookie_t));
139 
140 	mp->dring_ident = 0;
141 
142 	buf = (uint8_t *)mp->cookie;
143 
144 	/* skip over dring cookies */
145 	ASSERT(mp->ncookies == 1);
146 	buf += (mp->ncookies * sizeof (ldc_mem_cookie_t));
147 
148 	emsg = (vio_dring_reg_ext_msg_t *)buf;
149 
150 	/* copy data_ncookies in the msg */
151 	emsg->data_ncookies = dp->data_ncookies;
152 
153 	/* copy data area size in the msg */
154 	emsg->data_area_size = dp->data_sz;
155 
156 	/* copy data area cookies in the msg */
157 	bcopy(dp->data_cookie, (ldc_mem_cookie_t *)emsg->data_cookie,
158 	    sizeof (ldc_mem_cookie_t) * dp->data_ncookies);
159 
160 	D1(vswp, "%s exit\n", __func__);
161 
162 	return (mp);
163 }
164 
165 /*
166  * Allocate receive resources for the channel. The resources consist of a
167  * receive descriptor ring and an associated receive buffer area.
168  */
169 static dring_info_t *
170 vsw_create_rx_dring(vsw_ldc_t *ldcp)
171 {
172 	vsw_t			*vswp = ldcp->ldc_vswp;
173 	ldc_mem_info_t		minfo;
174 	dring_info_t		*dp;
175 
176 	dp = (dring_info_t *)kmem_zalloc(sizeof (dring_info_t), KM_SLEEP);
177 	mutex_init(&dp->dlock, NULL, MUTEX_DRIVER, NULL);
178 	ldcp->lane_out.dringp = dp;
179 
180 	/* Create the receive descriptor ring */
181 	if ((ldc_mem_dring_create(vsw_num_descriptors,
182 	    sizeof (vnet_rx_dringdata_desc_t), &dp->dring_handle)) != 0) {
183 		DERR(vswp, "vsw_create_rx_dring(%lld): ldc dring create "
184 		    "failed", ldcp->ldc_id);
185 		goto fail;
186 	}
187 
188 	ASSERT(dp->dring_handle != NULL);
189 
190 	/* Get the addr of descriptor ring */
191 	if ((ldc_mem_dring_info(dp->dring_handle, &minfo)) != 0) {
192 		DERR(vswp, "vsw_create_rx_dring(%lld): dring info failed\n",
193 		    ldcp->ldc_id);
194 		goto fail;
195 	} else {
196 		ASSERT(minfo.vaddr != 0);
197 		dp->pub_addr = minfo.vaddr;
198 	}
199 
200 	dp->num_descriptors = vsw_num_descriptors;
201 	dp->descriptor_size = sizeof (vnet_rx_dringdata_desc_t);
202 	dp->options = VIO_RX_DRING_DATA;
203 	dp->dring_ncookies = 1;	/* guaranteed by ldc */
204 	dp->num_bufs = vsw_num_descriptors * vsw_nrbufs_factor;
205 
206 	/*
207 	 * Allocate a table that maps descriptor to its associated buffer;
208 	 * used while receiving to validate that the peer has not changed the
209 	 * buffer offset provided in the descriptor.
210 	 */
211 	dp->rxdp_to_vmp = kmem_zalloc(dp->num_descriptors * sizeof (uintptr_t),
212 	    KM_SLEEP);
213 
214 	/* Setup the descriptor ring */
215 	if (vsw_setup_rx_dring(ldcp, dp)) {
216 		DERR(vswp, "%s: unable to setup ring", __func__);
217 		goto fail;
218 	}
219 
220 	/*
221 	 * The descriptors and the associated buffers are all ready;
222 	 * now bind descriptor ring to the channel.
223 	 */
224 	if ((ldc_mem_dring_bind(ldcp->ldc_handle, dp->dring_handle,
225 	    LDC_DIRECT_MAP | LDC_SHADOW_MAP, LDC_MEM_RW,
226 	    &dp->dring_cookie[0], &dp->dring_ncookies)) != 0) {
227 		DERR(vswp, "vsw_create_rx_dring: unable to bind to channel "
228 		    "%lld", ldcp->ldc_id);
229 		goto fail;
230 	}
231 
232 	/* haven't used any descriptors yet */
233 	dp->end_idx = 0;
234 	dp->last_ack_recv = -1;
235 	dp->next_rxi = 0;
236 	return (dp);
237 
238 fail:
239 	vsw_destroy_rx_dring(ldcp);
240 	return (NULL);
241 }
242 
243 /*
244  * Setup the descriptors in the rx dring.
245  * Returns 0 on success, 1 on failure.
246  */
247 static int
248 vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp)
249 {
250 	int				i, j;
251 	int				rv;
252 	size_t				data_sz;
253 	vio_mblk_t			*vmp;
254 	vio_mblk_t			**rxdp_to_vmp;
255 	vnet_rx_dringdata_desc_t	*rxdp;
256 	vnet_rx_dringdata_desc_t	*pub_addr;
257 	vsw_t				*vswp = ldcp->ldc_vswp;
258 	uint32_t			ncookies = 0;
259 	static char			*name = "vsw_setup_rx_dring";
260 	void				*data_addr = NULL;
261 
262 	/*
263 	 * Allocate a single large buffer that serves as the rx buffer area.
264 	 * We allocate a ldc memory handle and export the buffer area as shared
265 	 * memory. We send the ldc memcookie for this buffer space to the peer,
266 	 * as part of dring registration phase during handshake. We manage this
267 	 * buffer area as individual buffers of max_frame_size and provide
268 	 * specific buffer offsets in each descriptor to the peer. Note that
269 	 * the factor used to compute the # of buffers (above) must be > 1 to
270 	 * ensure that there are more buffers than the # of descriptors. This
271 	 * is needed because, while the shared memory buffers are sent up our
272 	 * stack during receive, the sender needs additional buffers that can
273 	 * be used for further transmits. This also means there is no one to
274 	 * one correspondence between the descriptor index and buffer offset.
275 	 * The sender has to read the buffer offset in the descriptor and use
276 	 * the specified offset to copy the tx data into the shared buffer. We
277 	 * (receiver) manage the individual buffers and their state (see
278 	 * VIO_MBLK_STATEs in vio_util.h).
279 	 */
280 	data_sz = vswp->max_frame_size + VNET_IPALIGN + VNET_LDCALIGN;
281 	data_sz = VNET_ROUNDUP_2K(data_sz);
282 
283 	dp->desc_data_sz = data_sz;
284 	dp->data_sz = (dp->num_bufs * data_sz);
285 	data_addr = kmem_zalloc(dp->data_sz, KM_SLEEP);
286 	dp->data_addr = data_addr;
287 
288 	D2(vswp, "%s: allocated %lld bytes at 0x%llx\n", name,
289 	    dp->data_sz, dp->data_addr);
290 
291 	/* Allocate a ldc memhandle for the entire rx data area */
292 	rv = ldc_mem_alloc_handle(ldcp->ldc_handle, &dp->data_handle);
293 	if (rv != 0) {
294 		DERR(vswp, "%s: alloc mem handle failed", name);
295 		goto fail;
296 	}
297 
298 	/* Allocate memory for the data cookies */
299 	dp->data_cookie = kmem_zalloc(VNET_DATA_AREA_COOKIES *
300 	    sizeof (ldc_mem_cookie_t), KM_SLEEP);
301 
302 	/*
303 	 * Bind ldc memhandle to the corresponding rx data area.
304 	 */
305 	rv = ldc_mem_bind_handle(dp->data_handle, (caddr_t)data_addr,
306 	    dp->data_sz, LDC_DIRECT_MAP, LDC_MEM_W,
307 	    dp->data_cookie, &ncookies);
308 	if (rv != 0) {
309 		DERR(vswp, "%s(%lld): ldc_mem_bind_handle failed "
310 		    "(rv %d)", name, ldcp->ldc_id, rv);
311 		goto fail;
312 	}
313 	if ((ncookies == 0) || (ncookies > VNET_DATA_AREA_COOKIES)) {
314 		goto fail;
315 	}
316 	dp->data_ncookies = ncookies;
317 
318 	for (j = 1; j < ncookies; j++) {
319 		rv = ldc_mem_nextcookie(dp->data_handle,
320 		    &(dp->data_cookie[j]));
321 		if (rv != 0) {
322 			DERR(vswp, "%s: ldc_mem_nextcookie "
323 			    "failed rv (%d)", name, rv);
324 			goto fail;
325 		}
326 	}
327 
328 	/*
329 	 * Successful in binding the handle to rx data area. Now setup mblks
330 	 * around each data buffer and setup the descriptors to point to these
331 	 * rx data buffers. We associate each descriptor with a buffer
332 	 * by specifying the buffer offset in the descriptor. When the peer
333 	 * needs to transmit data, this offset is read by the peer to determine
334 	 * the buffer in the mapped buffer area where the data to be
335 	 * transmitted should be copied, for a specific descriptor.
336 	 */
337 	rv = vio_create_mblks(dp->num_bufs, data_sz, (uint8_t *)data_addr,
338 	    &dp->rx_vmp);
339 	if (rv != 0) {
340 		goto fail;
341 	}
342 
343 	pub_addr = dp->pub_addr;
344 	rxdp_to_vmp = dp->rxdp_to_vmp;
345 	for (i = 0; i < dp->num_descriptors; i++) {
346 		rxdp = &pub_addr[i];
347 		/* allocate an mblk around this data buffer */
348 		vmp = vio_allocb(dp->rx_vmp);
349 		ASSERT(vmp != NULL);
350 		rxdp->data_buf_offset = VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN;
351 		rxdp->dstate = VIO_DESC_FREE;
352 		rxdp_to_vmp[i] = vmp;
353 	}
354 
355 	return (0);
356 
357 fail:
358 	/* return failure; caller will cleanup */
359 	return (1);
360 }
361 
362 /*
363  * Free receive resources for the channel.
364  */
365 void
366 vsw_destroy_rx_dring(vsw_ldc_t *ldcp)
367 {
368 	vsw_t		*vswp = ldcp->ldc_vswp;
369 	lane_t		*lp = &ldcp->lane_out;
370 	dring_info_t	*dp;
371 
372 	dp = lp->dringp;
373 	if (dp == NULL) {
374 		return;
375 	}
376 
377 	mutex_enter(&dp->dlock);
378 
379 	if (dp->rx_vmp != NULL) {
380 		vio_clobber_pool(dp->rx_vmp);
381 		/*
382 		 * If we can't destroy the rx pool for this channel, dispatch a
383 		 * task to retry and clean up those rx pools. Note that we
384 		 * don't need to wait for the task to complete. If the vsw
385 		 * device itself gets detached (vsw_detach()), it will wait for
386 		 * the task to complete implicitly in ddi_taskq_destroy().
387 		 */
388 		if (vio_destroy_mblks(dp->rx_vmp) != 0)  {
389 			(void) ddi_taskq_dispatch(vswp->rxp_taskq,
390 			    vsw_destroy_rxpools, dp->rx_vmp, DDI_SLEEP);
391 		}
392 	}
393 
394 	/* Free rx data area cookies */
395 	if (dp->data_cookie != NULL) {
396 		kmem_free(dp->data_cookie, VNET_DATA_AREA_COOKIES *
397 		    sizeof (ldc_mem_cookie_t));
398 		dp->data_cookie = NULL;
399 	}
400 
401 	/* Unbind rx data area memhandle */
402 	if (dp->data_ncookies != 0) {
403 		(void) ldc_mem_unbind_handle(dp->data_handle);
404 		dp->data_ncookies = 0;
405 	}
406 
407 	/* Free rx data area memhandle */
408 	if (dp->data_handle) {
409 		(void) ldc_mem_free_handle(dp->data_handle);
410 		dp->data_handle = 0;
411 	}
412 
413 	/* Now free the rx data area itself */
414 	if (dp->data_addr != NULL) {
415 		kmem_free(dp->data_addr, dp->data_sz);
416 	}
417 
418 	/* Finally, free the receive descriptor ring */
419 	if (dp->dring_handle != NULL) {
420 		(void) ldc_mem_dring_unbind(dp->dring_handle);
421 		(void) ldc_mem_dring_destroy(dp->dring_handle);
422 	}
423 
424 	if (dp->rxdp_to_vmp != NULL) {
425 		kmem_free(dp->rxdp_to_vmp,
426 		    dp->num_descriptors * sizeof (uintptr_t));
427 		dp->rxdp_to_vmp = NULL;
428 	}
429 
430 	mutex_exit(&dp->dlock);
431 	mutex_destroy(&dp->dlock);
432 	mutex_destroy(&dp->restart_lock);
433 	kmem_free(dp, sizeof (dring_info_t));
434 	lp->dringp = NULL;
435 }
436 
437 /*
438  * Map the receive descriptor ring exported by the peer, as our transmit
439  * descriptor ring.
440  */
441 dring_info_t *
442 vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt)
443 {
444 	int				i;
445 	int				rv;
446 	dring_info_t			*dp;
447 	vnet_rx_dringdata_desc_t	*txdp;
448 	on_trap_data_t			otd;
449 	vio_dring_reg_msg_t		*dring_pkt = pkt;
450 
451 	dp = vsw_map_dring_cmn(ldcp, dring_pkt);
452 	if (dp == NULL) {
453 		return (NULL);
454 	}
455 
456 	/* RxDringData mode specific initializations */
457 	mutex_init(&dp->txlock, NULL, MUTEX_DRIVER, NULL);
458 	mutex_init(&dp->restart_lock, NULL, MUTEX_DRIVER, NULL);
459 	dp->next_txi = dp->restart_peer_txi = 0;
460 	dp->restart_reqd = B_TRUE;
461 	ldcp->dringdata_msgid = 0;
462 	ldcp->lane_in.dringp = dp;
463 
464 	/*
465 	 * Mark the descriptor state as 'done'. This is implementation specific
466 	 * and not required by the protocol. In our implementation, we only
467 	 * need the descripor to be in 'done' state to be used by the transmit
468 	 * function and the peer is not aware of it. As the protocol requires
469 	 * that during initial registration the exporting end point mark the
470 	 * dstate as 'free', we change it 'done' here. After this, the dstate
471 	 * in our implementation will keep moving between 'ready', set by our
472 	 * transmit function; and and 'done', set by the peer (per protocol)
473 	 * after receiving data.
474 	 * Setup on_trap() protection before accessing dring shared memory area.
475 	 */
476 	rv = LDC_ON_TRAP(&otd);
477 	if (rv != 0) {
478 		/*
479 		 * Data access fault occured down the code path below while
480 		 * accessing the descriptors. Return failure.
481 		 */
482 		goto fail;
483 	}
484 
485 	txdp = (vnet_rx_dringdata_desc_t *)dp->pub_addr;
486 	for (i = 0; i < dp->num_descriptors; i++) {
487 		txdp[i].dstate = VIO_DESC_DONE;
488 	}
489 
490 	(void) LDC_NO_TRAP();
491 
492 	return (dp);
493 
494 fail:
495 	if (dp->dring_handle != NULL) {
496 		(void) ldc_mem_dring_unmap(dp->dring_handle);
497 	}
498 	kmem_free(dp, sizeof (*dp));
499 	return (NULL);
500 }
501 
502 /*
503  * Unmap the transmit descriptor ring.
504  */
505 void
506 vsw_unmap_tx_dring(vsw_ldc_t *ldcp)
507 {
508 	lane_t		*lp = &ldcp->lane_in;
509 	dring_info_t	*dp;
510 
511 	if ((dp = lp->dringp) == NULL) {
512 		return;
513 	}
514 
515 	/* Unmap tx data area and free data handle */
516 	if (dp->data_handle != NULL) {
517 		(void) ldc_mem_unmap(dp->data_handle);
518 		(void) ldc_mem_free_handle(dp->data_handle);
519 		dp->data_handle = NULL;
520 	}
521 
522 	/* Free tx data area cookies */
523 	if (dp->data_cookie != NULL) {
524 		kmem_free(dp->data_cookie, dp->data_ncookies *
525 		    sizeof (ldc_mem_cookie_t));
526 		dp->data_cookie = NULL;
527 		dp->data_ncookies = 0;
528 	}
529 
530 	/* Unmap peer's dring */
531 	if (dp->dring_handle != NULL) {
532 		(void) ldc_mem_dring_unmap(dp->dring_handle);
533 		dp->dring_handle = NULL;
534 	}
535 
536 	mutex_destroy(&dp->txlock);
537 	kmem_free(dp, sizeof (dring_info_t));
538 	lp->dringp = NULL;
539 }
540 
541 /*
542  * A per LDC worker thread to process the rx dring and receive packets. This
543  * thread is woken up by the LDC interrupt handler when a dring data info
544  * message is received.
545  */
546 void
547 vsw_ldc_rcv_worker(void *arg)
548 {
549 	callb_cpr_t	cprinfo;
550 	vsw_ldc_t	*ldcp = (vsw_ldc_t *)arg;
551 	vsw_t		*vswp = ldcp->ldc_vswp;
552 
553 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
554 	CALLB_CPR_INIT(&cprinfo, &ldcp->rcv_thr_lock, callb_generic_cpr,
555 	    "vsw_rcv_thread");
556 	mutex_enter(&ldcp->rcv_thr_lock);
557 	while (!(ldcp->rcv_thr_flags & VSW_WTHR_STOP)) {
558 
559 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
560 		/*
561 		 * Wait until the data is received or a stop
562 		 * request is received.
563 		 */
564 		while (!(ldcp->rcv_thr_flags &
565 		    (VSW_WTHR_DATARCVD | VSW_WTHR_STOP))) {
566 			cv_wait(&ldcp->rcv_thr_cv, &ldcp->rcv_thr_lock);
567 		}
568 		CALLB_CPR_SAFE_END(&cprinfo, &ldcp->rcv_thr_lock)
569 
570 		/*
571 		 * First process the stop request.
572 		 */
573 		if (ldcp->rcv_thr_flags & VSW_WTHR_STOP) {
574 			D2(vswp, "%s(%lld):Rx thread stopped\n",
575 			    __func__, ldcp->ldc_id);
576 			break;
577 		}
578 		ldcp->rcv_thr_flags &= ~VSW_WTHR_DATARCVD;
579 		mutex_exit(&ldcp->rcv_thr_lock);
580 		D1(vswp, "%s(%lld):calling vsw_process_pkt\n",
581 		    __func__, ldcp->ldc_id);
582 		vsw_ldc_rcv_shm(ldcp);
583 		mutex_enter(&ldcp->rcv_thr_lock);
584 	}
585 
586 	/*
587 	 * Update the run status and wakeup the thread that
588 	 * has sent the stop request.
589 	 */
590 	ldcp->rcv_thr_flags &= ~VSW_WTHR_STOP;
591 	ldcp->rcv_thread = NULL;
592 	CALLB_CPR_EXIT(&cprinfo);
593 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
594 	thread_exit();
595 }
596 
597 /*
598  * Process the rx descriptor ring in the context of receive worker
599  * thread and switch the received packets to their destinations.
600  */
601 static void
602 vsw_ldc_rcv_shm(vsw_ldc_t *ldcp)
603 {
604 	int		rv;
605 	uint32_t	end_ix;
606 	vio_dring_msg_t msg;
607 	vio_dring_msg_t	*msgp = &msg;
608 	int		count = 0;
609 	int		total_count = 0;
610 	uint32_t	retries = 0;
611 	mblk_t		*bp = NULL;
612 	mblk_t		*bpt = NULL;
613 	mblk_t		*mp = NULL;
614 	vsw_t		*vswp = ldcp->ldc_vswp;
615 	lane_t		*lp = &ldcp->lane_out;
616 	dring_info_t	*dp = lp->dringp;
617 
618 	do {
619 again:
620 		rv = vsw_receive_packet(ldcp, &mp);
621 		if (rv != 0) {
622 			if (rv == EINVAL) {
623 				/* Invalid descriptor error; get next */
624 				continue;
625 			}
626 			if (rv != EAGAIN) {
627 				break;
628 			}
629 
630 			/* Descriptor not ready for processsing */
631 			if (retries == vsw_recv_retries) {
632 				DTRACE_PROBE1(vsw_noready_rxds,
633 				    vsw_ldc_t *, ldcp);
634 				break;
635 			}
636 
637 			/* Switch packets received so far before retrying */
638 			if (bp != NULL) {
639 				VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
640 				    total_count);
641 			}
642 			retries++;
643 			drv_usecwait(vsw_recv_delay);
644 			goto again;
645 		}
646 		retries = 0;
647 
648 		/* Build a chain of received packets */
649 		if (bp == NULL) {
650 			/* first pkt */
651 			bp = mp;
652 			bpt = bp;
653 			bpt->b_next = NULL;
654 		} else {
655 			mp->b_next = NULL;
656 			bpt->b_next = mp;
657 			bpt = mp;
658 		}
659 
660 		total_count++;
661 		count++;
662 
663 		/*
664 		 * If we have gathered vsw_chain_len (tunable)
665 		 * # of packets in the chain, switch them.
666 		 */
667 		if (count == vsw_chain_len) {
668 			VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
669 			    total_count);
670 		}
671 
672 		/*
673 		 * Stop further processing if we processed the entire dring
674 		 * once; otherwise continue.
675 		 */
676 	} while (total_count < dp->num_bufs);
677 
678 	DTRACE_PROBE2(vsw_rx_total_count, vsw_ldc_t *, ldcp,
679 	    int, (total_count));
680 	if (bp != NULL) {
681 		VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
682 		    total_count);
683 	}
684 
685 	/* Send stopped signal to peer (sender) */
686 	end_ix = lp->dringp->next_rxi;
687 	DECR_RXI(dp, end_ix);
688 	msgp->tag.vio_msgtype = VIO_TYPE_DATA;
689 	msgp->tag.vio_subtype = VIO_SUBTYPE_ACK;
690 	msgp->tag.vio_subtype_env = VIO_DRING_DATA;
691 	msgp->dring_ident = ldcp->lane_in.dringp->ident;
692 	msgp->tag.vio_sid = ldcp->local_session;
693 	msgp->dring_process_state = VIO_DP_STOPPED;
694 	msgp->start_idx = VNET_START_IDX_UNSPEC;
695 	msgp->end_idx = end_ix;
696 
697 	(void) vsw_send_msg_shm(ldcp, (void *)msgp,
698 	    sizeof (vio_dring_msg_t), B_TRUE);
699 
700 	ldcp->ldc_stats.dring_data_acks_sent++;
701 	ldcp->ldc_stats.dring_stopped_acks_sent++;
702 }
703 
704 /*
705  * Process the next index in the rx dring and receive the associated packet.
706  *
707  * Returns:
708  *	bp:	Success: The received packet.
709  *		Failure: NULL
710  *      retval:
711  *		Success: 0
712  *		Failure: EAGAIN: Descriptor not ready
713  *			 EIO:    Descriptor contents invalid.
714  */
715 static int
716 vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp)
717 {
718 	uint32_t			rxi;
719 	vio_mblk_t			*vmp;
720 	vio_mblk_t			*new_vmp;
721 	struct ether_header		*ehp;
722 	vnet_rx_dringdata_desc_t	*rxdp;
723 	int				err = 0;
724 	uint_t				nbytes = 0;
725 	mblk_t				*mp = NULL;
726 	mblk_t				*dmp = NULL;
727 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
728 	dring_info_t			*dp = ldcp->lane_out.dringp;
729 	vnet_rx_dringdata_desc_t	*pub_addr = dp->pub_addr;
730 
731 	rxi = dp->next_rxi;
732 	rxdp = &(pub_addr[rxi]);
733 	vmp = dp->rxdp_to_vmp[rxi];
734 
735 	if (rxdp->dstate != VIO_DESC_READY) {
736 		/*
737 		 * Descriptor is not ready.
738 		 */
739 		return (EAGAIN);
740 	}
741 
742 	/*
743 	 * Ensure load ordering of dstate and nbytes.
744 	 */
745 	MEMBAR_CONSUMER();
746 
747 	if ((rxdp->nbytes < ETHERMIN) ||
748 	    (rxdp->nbytes > ldcp->lane_in.mtu) ||
749 	    (rxdp->data_buf_offset !=
750 	    (VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN))) {
751 		/*
752 		 * Descriptor contents invalid.
753 		 */
754 		statsp->ierrors++;
755 		rxdp->dstate = VIO_DESC_DONE;
756 		err = EIO;
757 		goto done;
758 	}
759 
760 	/*
761 	 * Now allocate a new buffer for this descriptor before sending up the
762 	 * buffer being processed. If that fails, stop processing; as we are
763 	 * out of receive buffers.
764 	 */
765 	new_vmp = vio_allocb(dp->rx_vmp);
766 
767 	/*
768 	 * Process the current buffer being received.
769 	 */
770 	nbytes = rxdp->nbytes;
771 	mp = vmp->mp;
772 
773 	if (new_vmp == NULL) {
774 		/*
775 		 * We failed to get a new mapped buffer that is needed to
776 		 * refill the descriptor. In that case, leave the current
777 		 * buffer bound to the descriptor; allocate an mblk dynamically
778 		 * and copy the contents of the buffer to the mblk. Then send
779 		 * up this mblk. This way the sender has the same buffer as
780 		 * before that can be used to send new data.
781 		 */
782 		statsp->norcvbuf++;
783 		dmp = allocb(nbytes + VNET_IPALIGN, BPRI_MED);
784 		bcopy(mp->b_rptr + VNET_IPALIGN,
785 		    dmp->b_rptr + VNET_IPALIGN, nbytes);
786 		mp = dmp;
787 	} else {
788 		/* Mark the status of the current rbuf */
789 		vmp->state = VIO_MBLK_HAS_DATA;
790 
791 		/* Set the offset of the new buffer in the descriptor */
792 		rxdp->data_buf_offset =
793 		    VIO_MBLK_DATA_OFF(new_vmp) + VNET_IPALIGN;
794 		dp->rxdp_to_vmp[rxi] = new_vmp;
795 	}
796 	mp->b_rptr += VNET_IPALIGN;
797 	mp->b_wptr = mp->b_rptr + nbytes;
798 
799 	/*
800 	 * Ensure store ordering of data_buf_offset and dstate; so that the
801 	 * peer sees the right data_buf_offset after it checks that the dstate
802 	 * is DONE.
803 	 */
804 	MEMBAR_PRODUCER();
805 
806 	/* Now mark the descriptor 'done' */
807 	rxdp->dstate = VIO_DESC_DONE;
808 
809 	/* Update stats */
810 	statsp->ipackets++;
811 	statsp->rbytes += rxdp->nbytes;
812 	ehp = (struct ether_header *)mp->b_rptr;
813 	if (IS_BROADCAST(ehp))
814 		statsp->brdcstrcv++;
815 	else if (IS_MULTICAST(ehp))
816 		statsp->multircv++;
817 done:
818 	/* Update the next index to be processed */
819 	INCR_RXI(dp, rxi);
820 
821 	/* Save the new recv index */
822 	dp->next_rxi = rxi;
823 
824 	/* Return the packet received */
825 	*bp = mp;
826 	return (err);
827 }
828 
829 void
830 vsw_stop_rcv_thread(vsw_ldc_t *ldcp)
831 {
832 	kt_did_t	tid = 0;
833 	vsw_t		*vswp = ldcp->ldc_vswp;
834 
835 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
836 	/*
837 	 * Send a stop request by setting the stop flag and
838 	 * wait until the rcv process thread stops.
839 	 */
840 	mutex_enter(&ldcp->rcv_thr_lock);
841 	if (ldcp->rcv_thread != NULL) {
842 		tid = ldcp->rcv_thread->t_did;
843 		ldcp->rcv_thr_flags |= VSW_WTHR_STOP;
844 		cv_signal(&ldcp->rcv_thr_cv);
845 	}
846 	mutex_exit(&ldcp->rcv_thr_lock);
847 
848 	if (tid != 0) {
849 		thread_join(tid);
850 	}
851 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
852 }
853 
854 int
855 vsw_dringsend_shm(vsw_ldc_t *ldcp, mblk_t *mp)
856 {
857 	uint32_t			next_txi;
858 	uint32_t			txi;
859 	vnet_rx_dringdata_desc_t	*txdp;
860 	struct ether_header		*ehp;
861 	size_t				mblksz;
862 	caddr_t				dst;
863 	mblk_t				*bp;
864 	size_t				size;
865 	on_trap_data_t			otd;
866 	uint32_t			buf_offset;
867 	vnet_rx_dringdata_desc_t	*pub_addr;
868 	vio_dring_msg_t			msg;
869 	vio_dring_msg_t			*msgp = &msg;
870 	int				rv = 0;
871 	boolean_t			resched_peer = B_FALSE;
872 	boolean_t			is_bcast = B_FALSE;
873 	boolean_t			is_mcast = B_FALSE;
874 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
875 	lane_t				*lane_in = &ldcp->lane_in;
876 	lane_t				*lane_out = &ldcp->lane_out;
877 	dring_info_t			*dp = lane_in->dringp;
878 	vsw_t				*vswp = ldcp->ldc_vswp;
879 
880 	if ((!(lane_in->lstate & VSW_LANE_ACTIVE)) ||
881 	    (ldcp->ldc_status != LDC_UP) || (ldcp->ldc_handle == NULL)) {
882 		DWARN(vswp, "%s(%lld) status(%d) lstate(0x%llx), dropping "
883 		    "packet\n", __func__, ldcp->ldc_id, ldcp->ldc_status,
884 		    lane_in->lstate);
885 		statsp->oerrors++;
886 		return (LDC_TX_FAILURE);
887 	}
888 
889 	if (dp == NULL) {
890 		DERR(vswp, "%s(%lld): no dring for outbound lane on"
891 		    " channel %d", __func__, ldcp->ldc_id, ldcp->ldc_id);
892 		statsp->oerrors++;
893 		return (LDC_TX_FAILURE);
894 	}
895 	pub_addr = dp->pub_addr;
896 
897 	size = msgsize(mp);
898 
899 	/*
900 	 * Note: In RxDringData mode, lane_in is associated with transmit and
901 	 * lane_out is associated with receive. However, we still keep the
902 	 * negotiated mtu in lane_out (our exported attributes).
903 	 */
904 	if (size > (size_t)lane_out->mtu) {
905 		DERR(vswp, "%s(%lld) invalid size (%ld)\n", __func__,
906 		    ldcp->ldc_id, size);
907 		statsp->oerrors++;
908 		return (LDC_TX_FAILURE);
909 	}
910 
911 	if (size < ETHERMIN)
912 		size = ETHERMIN;
913 
914 	ehp = (struct ether_header *)mp->b_rptr;
915 	is_bcast = IS_BROADCAST(ehp);
916 	is_mcast = IS_MULTICAST(ehp);
917 
918 	/*
919 	 * Setup on_trap() protection before accessing shared memory areas
920 	 * (descriptor and data buffer). Note that we enable this protection a
921 	 * little early and turn it off slightly later, than keeping it enabled
922 	 * strictly at the points in code below where the descriptor and data
923 	 * buffer are accessed. This is done for performance reasons:
924 	 * (a) to avoid calling the trap protection code while holding mutex.
925 	 * (b) to avoid multiple on/off steps for descriptor and data accesses.
926 	 */
927 	rv = LDC_ON_TRAP(&otd);
928 	if (rv != 0) {
929 		/*
930 		 * Data access fault occured down the code path below while
931 		 * accessing either the descriptor or the data buffer. Release
932 		 * any locks that we might have acquired in the code below and
933 		 * return failure.
934 		 */
935 		DERR(vswp, "%s(%lld) data access fault occured\n",
936 		    __func__, ldcp->ldc_id);
937 		statsp->oerrors++;
938 		if (mutex_owned(&dp->txlock)) {
939 			mutex_exit(&dp->txlock);
940 		}
941 		if (mutex_owned(&dp->restart_lock)) {
942 			mutex_exit(&dp->restart_lock);
943 		}
944 		goto dringsend_shm_exit;
945 	}
946 
947 	/*
948 	 * Allocate a descriptor
949 	 */
950 	mutex_enter(&dp->txlock);
951 	txi = next_txi = dp->next_txi;
952 	INCR_TXI(dp, next_txi);
953 	txdp = &(pub_addr[txi]);
954 	if (txdp->dstate != VIO_DESC_DONE) { /* out of descriptors */
955 		statsp->tx_no_desc++;
956 		mutex_exit(&dp->txlock);
957 		(void) LDC_NO_TRAP();
958 		return (LDC_TX_NORESOURCES);
959 	} else {
960 		txdp->dstate = VIO_DESC_INITIALIZING;
961 	}
962 
963 	/* Update descriptor ring index */
964 	dp->next_txi = next_txi;
965 	mutex_exit(&dp->txlock);
966 
967 	/* Ensure load ordering of dstate (above) and data_buf_offset. */
968 	MEMBAR_CONSUMER();
969 
970 	/* Get the offset of the buffer to be used */
971 	buf_offset = txdp->data_buf_offset;
972 
973 	/* Access the buffer using the offset */
974 	dst = (caddr_t)dp->data_addr + buf_offset;
975 
976 	/* Copy data into mapped transmit buffer */
977 	for (bp = mp; bp != NULL; bp = bp->b_cont) {
978 		mblksz = MBLKL(bp);
979 		bcopy(bp->b_rptr, dst, mblksz);
980 		dst += mblksz;
981 	}
982 
983 	/* Set the size of data in the descriptor */
984 	txdp->nbytes = size;
985 
986 	/*
987 	 * Ensure store ordering of nbytes and dstate (below); so that the peer
988 	 * sees the right nbytes value after it checks that the dstate is READY.
989 	 */
990 	MEMBAR_PRODUCER();
991 
992 	mutex_enter(&dp->restart_lock);
993 
994 	ASSERT(txdp->dstate == VIO_DESC_INITIALIZING);
995 
996 	/* Mark the descriptor ready */
997 	txdp->dstate = VIO_DESC_READY;
998 
999 	/* Check if peer needs wake up (handled below) */
1000 	if (dp->restart_reqd == B_TRUE && dp->restart_peer_txi == txi) {
1001 		dp->restart_reqd = B_FALSE;
1002 		resched_peer = B_TRUE;
1003 	}
1004 
1005 	/* Update tx stats */
1006 	statsp->opackets++;
1007 	statsp->obytes += size;
1008 	if (is_bcast)
1009 		statsp->brdcstxmt++;
1010 	else if (is_mcast)
1011 		statsp->multixmt++;
1012 
1013 	mutex_exit(&dp->restart_lock);
1014 
1015 	/*
1016 	 * We are done accessing shared memory; clear trap protection.
1017 	 */
1018 	(void) LDC_NO_TRAP();
1019 
1020 	/*
1021 	 * Need to wake up the peer ?
1022 	 */
1023 	if (resched_peer == B_TRUE) {
1024 		msgp->tag.vio_msgtype = VIO_TYPE_DATA;
1025 		msgp->tag.vio_subtype = VIO_SUBTYPE_INFO;
1026 		msgp->tag.vio_subtype_env = VIO_DRING_DATA;
1027 		msgp->tag.vio_sid = ldcp->local_session;
1028 		msgp->dring_ident = lane_out->dringp->ident;
1029 		msgp->start_idx = txi;
1030 		msgp->end_idx = -1;
1031 
1032 		rv = vsw_send_msg_shm(ldcp, (void *)msgp, sizeof (*msgp),
1033 		    B_FALSE);
1034 		if (rv != 0) {
1035 			/* error: drop the packet */
1036 			DERR(vswp, "%s(%lld) failed sending dringdata msg\n",
1037 			    __func__, ldcp->ldc_id);
1038 			mutex_enter(&dp->restart_lock);
1039 			statsp->oerrors++;
1040 			dp->restart_reqd = B_TRUE;
1041 			mutex_exit(&dp->restart_lock);
1042 		}
1043 		statsp->dring_data_msgs_sent++;
1044 	}
1045 
1046 dringsend_shm_exit:
1047 	if (rv == ECONNRESET || rv == EACCES) {
1048 		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1049 	}
1050 	return (LDC_TX_SUCCESS);
1051 }
1052 
1053 void
1054 vsw_process_dringdata_shm(void *arg, void *dpkt)
1055 {
1056 	vsw_ldc_t		*ldcp = arg;
1057 	vsw_t			*vswp = ldcp->ldc_vswp;
1058 	vio_dring_msg_t		*dring_pkt = dpkt;
1059 
1060 	switch (dring_pkt->tag.vio_subtype) {
1061 	case VIO_SUBTYPE_INFO:
1062 		D2(vswp, "%s(%lld): VIO_SUBTYPE_INFO", __func__, ldcp->ldc_id);
1063 		vsw_process_dringdata_info_shm(ldcp, dring_pkt);
1064 		break;
1065 
1066 	case VIO_SUBTYPE_ACK:
1067 		D2(vswp, "%s(%lld): VIO_SUBTYPE_ACK", __func__, ldcp->ldc_id);
1068 		vsw_process_dringdata_ack_shm(ldcp, dring_pkt);
1069 		break;
1070 
1071 	case VIO_SUBTYPE_NACK:
1072 		DWARN(vswp, "%s(%lld): VIO_SUBTYPE_NACK",
1073 		    __func__, ldcp->ldc_id);
1074 		/*
1075 		 * Something is badly wrong if we are getting NACK's
1076 		 * for our data pkts. So reset the channel.
1077 		 */
1078 		vsw_process_conn_evt(ldcp, VSW_CONN_RESTART);
1079 		break;
1080 
1081 	default:
1082 		DERR(vswp, "%s(%lld): Unknown vio_subtype %x\n", __func__,
1083 		    ldcp->ldc_id, dring_pkt->tag.vio_subtype);
1084 	}
1085 }
1086 
1087 static void
1088 vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1089 {
1090 	dring_info_t	*dp = ldcp->lane_in.dringp;
1091 	vsw_t		*vswp = ldcp->ldc_vswp;
1092 	vgen_stats_t	*statsp = &ldcp->ldc_stats;
1093 
1094 	if (dp->ident != msg->dring_ident) {
1095 		/* drop the message */
1096 		DERR(vswp, "%s(%lld): Invalid dring ident 0x%llx",
1097 		    __func__, ldcp->ldc_id, msg->dring_ident);
1098 		return;
1099 	}
1100 
1101 	statsp->dring_data_msgs_rcvd++;
1102 
1103 	/*
1104 	 * Wake up the rcv worker thread to process the rx dring.
1105 	 */
1106 	ASSERT(MUTEX_HELD(&ldcp->ldc_cblock));
1107 	mutex_exit(&ldcp->ldc_cblock);
1108 	mutex_enter(&ldcp->rcv_thr_lock);
1109 	if (!(ldcp->rcv_thr_flags & VSW_WTHR_DATARCVD)) {
1110 		ldcp->rcv_thr_flags |= VSW_WTHR_DATARCVD;
1111 		cv_signal(&ldcp->rcv_thr_cv);
1112 	}
1113 	mutex_exit(&ldcp->rcv_thr_lock);
1114 	mutex_enter(&ldcp->ldc_cblock);
1115 }
1116 
1117 static void
1118 vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1119 {
1120 	dring_info_t			*dp;
1121 	uint32_t			start;
1122 	int32_t				end;
1123 	int				rv;
1124 	on_trap_data_t			otd;
1125 	uint32_t			txi;
1126 	vnet_rx_dringdata_desc_t	*txdp;
1127 	vnet_rx_dringdata_desc_t	*pub_addr;
1128 	boolean_t			ready_txd = B_FALSE;
1129 	vsw_t				*vswp = ldcp->ldc_vswp;
1130 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
1131 
1132 	dp = ldcp->lane_in.dringp;
1133 	start = msg->start_idx;
1134 	end = msg->end_idx;
1135 	pub_addr = dp->pub_addr;
1136 
1137 	/*
1138 	 * In RxDringData mode (v1.6), start index of -1 can be used by the
1139 	 * peer to indicate that it is unspecified. However, the end index
1140 	 * must be set correctly indicating the last descriptor index processed.
1141 	 */
1142 	if (((start != VNET_START_IDX_UNSPEC) && !(CHECK_TXI(dp, start))) ||
1143 	    !(CHECK_TXI(dp, end))) {
1144 		/* drop the message if invalid index */
1145 		DWARN(vswp, "%s(%lld): Invalid Tx ack start(%d) or end(%d)\n",
1146 		    __func__, ldcp->ldc_id, start, end);
1147 		return;
1148 	}
1149 
1150 	/* Validate dring_ident */
1151 	if (msg->dring_ident != ldcp->lane_out.dringp->ident) {
1152 		/* invalid dring_ident, drop the msg */
1153 		DWARN(vswp, "%s(%lld): Invalid dring ident 0x%x\n",
1154 		    __func__, ldcp->ldc_id, msg->dring_ident);
1155 		return;
1156 	}
1157 	statsp->dring_data_acks_rcvd++;
1158 
1159 	if (msg->dring_process_state != VIO_DP_STOPPED) {
1160 		/*
1161 		 * Receiver continued processing
1162 		 * dring after sending us the ack.
1163 		 */
1164 		return;
1165 	}
1166 
1167 	statsp->dring_stopped_acks_rcvd++;
1168 
1169 	/*
1170 	 * Setup on_trap() protection before accessing dring shared memory area.
1171 	 */
1172 	rv = LDC_ON_TRAP(&otd);
1173 	if (rv != 0) {
1174 		/*
1175 		 * Data access fault occured down the code path below while
1176 		 * accessing the descriptors. Release any locks that we might
1177 		 * have acquired in the code below and return failure.
1178 		 */
1179 		if (mutex_owned(&dp->restart_lock)) {
1180 			mutex_exit(&dp->restart_lock);
1181 		}
1182 		return;
1183 	}
1184 
1185 	/*
1186 	 * Determine if there are any pending tx descriptors ready to be
1187 	 * processed by the receiver(peer) and if so, send a message to the
1188 	 * peer to restart receiving.
1189 	 */
1190 	mutex_enter(&dp->restart_lock);
1191 
1192 	ready_txd = B_FALSE;
1193 	txi = end;
1194 	INCR_TXI(dp, txi);
1195 	txdp = &pub_addr[txi];
1196 	if (txdp->dstate == VIO_DESC_READY) {
1197 		ready_txd = B_TRUE;
1198 	}
1199 
1200 	/*
1201 	 * We are done accessing shared memory; clear trap protection.
1202 	 */
1203 	(void) LDC_NO_TRAP();
1204 
1205 	if (ready_txd == B_FALSE) {
1206 		/*
1207 		 * No ready tx descriptors. Set the flag to send a message to
1208 		 * the peer when tx descriptors are ready in transmit routine.
1209 		 */
1210 		dp->restart_reqd = B_TRUE;
1211 		dp->restart_peer_txi = txi;
1212 		mutex_exit(&dp->restart_lock);
1213 		return;
1214 	}
1215 
1216 	/*
1217 	 * We have some tx descriptors ready to be processed by the receiver.
1218 	 * Send a dring data message to the peer to restart processing.
1219 	 */
1220 	dp->restart_reqd = B_FALSE;
1221 	mutex_exit(&dp->restart_lock);
1222 
1223 	msg->tag.vio_msgtype = VIO_TYPE_DATA;
1224 	msg->tag.vio_subtype = VIO_SUBTYPE_INFO;
1225 	msg->tag.vio_subtype_env = VIO_DRING_DATA;
1226 	msg->tag.vio_sid = ldcp->local_session;
1227 	msg->dring_ident = ldcp->lane_out.dringp->ident;
1228 	msg->start_idx = txi;
1229 	msg->end_idx = -1;
1230 	rv = vsw_send_msg_shm(ldcp, (void *)msg,
1231 	    sizeof (vio_dring_msg_t), B_FALSE);
1232 	statsp->dring_data_msgs_sent++;
1233 	if (rv != 0) {
1234 		mutex_enter(&dp->restart_lock);
1235 		dp->restart_reqd = B_TRUE;
1236 		mutex_exit(&dp->restart_lock);
1237 	}
1238 
1239 	if (rv == ECONNRESET) {
1240 		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1241 	}
1242 }
1243 
1244 /*
1245  * Send dring data msgs (info/ack/nack) over LDC.
1246  */
1247 int
1248 vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size, boolean_t handle_reset)
1249 {
1250 	int			rv;
1251 	int			retries = vsw_wretries;
1252 	size_t			msglen = size;
1253 	vsw_t			*vswp = ldcp->ldc_vswp;
1254 	vio_dring_msg_t		*dmsg = (vio_dring_msg_t *)msgp;
1255 
1256 	D1(vswp, "vsw_send_msg (%lld) enter : sending %d bytes",
1257 	    ldcp->ldc_id, size);
1258 
1259 	dmsg->seq_num = atomic_inc_32_nv(&ldcp->dringdata_msgid);
1260 
1261 	do {
1262 		msglen = size;
1263 		rv = ldc_write(ldcp->ldc_handle, (caddr_t)msgp, &msglen);
1264 	} while (rv == EWOULDBLOCK && --retries > 0);
1265 
1266 	if ((rv != 0) || (msglen != size)) {
1267 		DERR(vswp, "vsw_send_msg_shm:ldc_write failed: "
1268 		    "chan(%lld) rv(%d) size (%d) msglen(%d)\n",
1269 		    ldcp->ldc_id, rv, size, msglen);
1270 		ldcp->ldc_stats.oerrors++;
1271 	}
1272 
1273 	/*
1274 	 * If channel has been reset we either handle it here or
1275 	 * simply report back that it has been reset and let caller
1276 	 * decide what to do.
1277 	 */
1278 	if (rv == ECONNRESET) {
1279 		DWARN(vswp, "%s (%lld) channel reset", __func__, ldcp->ldc_id);
1280 
1281 		if (handle_reset) {
1282 			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1283 		}
1284 	}
1285 
1286 	return (rv);
1287 }
1288