xref: /titanic_51/usr/src/uts/sun4v/io/vsw_rxdring.c (revision cb15d5d96b3b2730714c28bfe06cfe7421758b8c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 #include <sys/types.h>
27 #include <sys/errno.h>
28 #include <sys/sysmacros.h>
29 #include <sys/param.h>
30 #include <sys/machsystm.h>
31 #include <sys/stream.h>
32 #include <sys/strsubr.h>
33 #include <sys/kmem.h>
34 #include <sys/strsun.h>
35 #include <sys/callb.h>
36 #include <sys/sdt.h>
37 #include <sys/mach_descrip.h>
38 #include <sys/mdeg.h>
39 #include <net/if.h>
40 #include <sys/vsw.h>
41 #include <sys/vio_mailbox.h>
42 #include <sys/vio_common.h>
43 #include <sys/vnet_common.h>
44 #include <sys/vnet_mailbox.h>
45 #include <sys/vio_util.h>
46 
47 /*
48  * This file contains the implementation of RxDringData transfer mode of VIO
49  * Protocol in vsw. The functions in this file are invoked from vsw_ldc.c
50  * after RxDringData mode is negotiated with the peer during attribute phase of
51  * handshake. This file contains functions that setup the transmit and receive
52  * descriptor rings, and associated resources in RxDringData mode. It also
53  * contains the transmit and receive data processing functions that are invoked
54  * in RxDringData mode. The data processing routines in this file have the
55  * suffix '_shm' to indicate the shared memory mechanism used in RxDringData
56  * mode.
57  */
58 
59 /* Functions exported to vsw_ldc.c */
60 vio_dring_reg_msg_t *vsw_create_rx_dring_info(vsw_ldc_t *);
61 void vsw_destroy_rx_dring(vsw_ldc_t *ldcp);
62 dring_info_t *vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt);
63 void vsw_unmap_tx_dring(vsw_ldc_t *ldcp);
64 int vsw_dringsend_shm(vsw_ldc_t *, mblk_t *);
65 void vsw_ldc_rcv_worker(void *arg);
66 void vsw_stop_rcv_thread(vsw_ldc_t *ldcp);
67 void vsw_process_dringdata_shm(void *, void *);
68 
69 /* Internal functions */
70 static dring_info_t *vsw_create_rx_dring(vsw_ldc_t *);
71 static int vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp);
72 static void vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp,
73 	vio_dring_msg_t *msg);
74 static void vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp,
75 	vio_dring_msg_t *msg);
76 static void vsw_ldc_rcv_shm(vsw_ldc_t *ldcp);
77 static int vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp);
78 static int vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size,
79     boolean_t handle_reset);
80 
81 /* Functions imported from vsw_ldc.c */
82 extern void vsw_process_pkt(void *);
83 extern void vsw_destroy_rxpools(void *);
84 extern dring_info_t *vsw_map_dring_cmn(vsw_ldc_t *ldcp,
85     vio_dring_reg_msg_t *dring_pkt);
86 extern void vsw_process_conn_evt(vsw_ldc_t *, uint16_t);
87 extern mblk_t *vsw_vlan_frame_pretag(void *arg, int type, mblk_t *mp);
88 
89 /* Tunables */
90 extern int vsw_wretries;
91 extern int vsw_recv_delay;
92 extern int vsw_recv_retries;
93 extern uint32_t vsw_chain_len;
94 extern uint32_t vsw_num_descriptors;
95 extern uint32_t vsw_nrbufs_factor;
96 
97 #define	VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count, total_count)	\
98 {									\
99 	DTRACE_PROBE2(vsw_rx_pkts, vsw_ldc_t *, (ldcp), int, (count));	\
100 	(vswp)->vsw_switch_frame((vswp), (bp), VSW_VNETPORT,		\
101 	    (ldcp)->ldc_port, NULL);					\
102 	(bp) = (bpt) = NULL;						\
103 	(count) = 0;							\
104 }
105 
106 vio_dring_reg_msg_t *
107 vsw_create_rx_dring_info(vsw_ldc_t *ldcp)
108 {
109 	vio_dring_reg_msg_t	*mp;
110 	vio_dring_reg_ext_msg_t	*emsg;
111 	dring_info_t		*dp;
112 	uint8_t			*buf;
113 	vsw_t			*vswp = ldcp->ldc_vswp;
114 
115 	D1(vswp, "%s enter\n", __func__);
116 
117 	/*
118 	 * If we can't create a dring, obviously no point sending
119 	 * a message.
120 	 */
121 	if ((dp = vsw_create_rx_dring(ldcp)) == NULL)
122 		return (NULL);
123 
124 	mp = kmem_zalloc(VNET_DRING_REG_EXT_MSG_SIZE(dp->data_ncookies),
125 	    KM_SLEEP);
126 
127 	mp->tag.vio_msgtype = VIO_TYPE_CTRL;
128 	mp->tag.vio_subtype = VIO_SUBTYPE_INFO;
129 	mp->tag.vio_subtype_env = VIO_DRING_REG;
130 	mp->tag.vio_sid = ldcp->local_session;
131 
132 	/* payload */
133 	mp->num_descriptors = dp->num_descriptors;
134 	mp->descriptor_size = dp->descriptor_size;
135 	mp->options = dp->options;
136 	mp->ncookies = dp->dring_ncookies;
137 	bcopy(&dp->dring_cookie[0], &mp->cookie[0],
138 	    sizeof (ldc_mem_cookie_t));
139 
140 	mp->dring_ident = 0;
141 
142 	buf = (uint8_t *)mp->cookie;
143 
144 	/* skip over dring cookies */
145 	ASSERT(mp->ncookies == 1);
146 	buf += (mp->ncookies * sizeof (ldc_mem_cookie_t));
147 
148 	emsg = (vio_dring_reg_ext_msg_t *)buf;
149 
150 	/* copy data_ncookies in the msg */
151 	emsg->data_ncookies = dp->data_ncookies;
152 
153 	/* copy data area size in the msg */
154 	emsg->data_area_size = dp->data_sz;
155 
156 	/* copy data area cookies in the msg */
157 	bcopy(dp->data_cookie, (ldc_mem_cookie_t *)emsg->data_cookie,
158 	    sizeof (ldc_mem_cookie_t) * dp->data_ncookies);
159 
160 	D1(vswp, "%s exit\n", __func__);
161 
162 	return (mp);
163 }
164 
165 /*
166  * Allocate receive resources for the channel. The resources consist of a
167  * receive descriptor ring and an associated receive buffer area.
168  */
169 static dring_info_t *
170 vsw_create_rx_dring(vsw_ldc_t *ldcp)
171 {
172 	vsw_t			*vswp = ldcp->ldc_vswp;
173 	ldc_mem_info_t		minfo;
174 	dring_info_t		*dp;
175 
176 	dp = (dring_info_t *)kmem_zalloc(sizeof (dring_info_t), KM_SLEEP);
177 	mutex_init(&dp->dlock, NULL, MUTEX_DRIVER, NULL);
178 	ldcp->lane_out.dringp = dp;
179 
180 	/* Create the receive descriptor ring */
181 	if ((ldc_mem_dring_create(vsw_num_descriptors,
182 	    sizeof (vnet_rx_dringdata_desc_t), &dp->dring_handle)) != 0) {
183 		DERR(vswp, "vsw_create_rx_dring(%lld): ldc dring create "
184 		    "failed", ldcp->ldc_id);
185 		goto fail;
186 	}
187 
188 	ASSERT(dp->dring_handle != NULL);
189 
190 	/* Get the addr of descriptor ring */
191 	if ((ldc_mem_dring_info(dp->dring_handle, &minfo)) != 0) {
192 		DERR(vswp, "vsw_create_rx_dring(%lld): dring info failed\n",
193 		    ldcp->ldc_id);
194 		goto fail;
195 	} else {
196 		ASSERT(minfo.vaddr != 0);
197 		dp->pub_addr = minfo.vaddr;
198 	}
199 
200 	dp->num_descriptors = vsw_num_descriptors;
201 	dp->descriptor_size = sizeof (vnet_rx_dringdata_desc_t);
202 	dp->options = VIO_RX_DRING_DATA;
203 	dp->dring_ncookies = 1;	/* guaranteed by ldc */
204 	dp->num_bufs = vsw_num_descriptors * vsw_nrbufs_factor;
205 
206 	/*
207 	 * Allocate a table that maps descriptor to its associated buffer;
208 	 * used while receiving to validate that the peer has not changed the
209 	 * buffer offset provided in the descriptor.
210 	 */
211 	dp->rxdp_to_vmp = kmem_zalloc(dp->num_descriptors * sizeof (uintptr_t),
212 	    KM_SLEEP);
213 
214 	/* Setup the descriptor ring */
215 	if (vsw_setup_rx_dring(ldcp, dp)) {
216 		DERR(vswp, "%s: unable to setup ring", __func__);
217 		goto fail;
218 	}
219 
220 	/*
221 	 * The descriptors and the associated buffers are all ready;
222 	 * now bind descriptor ring to the channel.
223 	 */
224 	if ((ldc_mem_dring_bind(ldcp->ldc_handle, dp->dring_handle,
225 	    LDC_DIRECT_MAP | LDC_SHADOW_MAP, LDC_MEM_RW,
226 	    &dp->dring_cookie[0], &dp->dring_ncookies)) != 0) {
227 		DERR(vswp, "vsw_create_rx_dring: unable to bind to channel "
228 		    "%lld", ldcp->ldc_id);
229 		goto fail;
230 	}
231 
232 	/* haven't used any descriptors yet */
233 	dp->end_idx = 0;
234 	dp->last_ack_recv = -1;
235 	dp->next_rxi = 0;
236 	return (dp);
237 
238 fail:
239 	vsw_destroy_rx_dring(ldcp);
240 	return (NULL);
241 }
242 
243 /*
244  * Setup the descriptors in the rx dring.
245  * Returns 0 on success, 1 on failure.
246  */
247 static int
248 vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp)
249 {
250 	int				i;
251 	int				rv;
252 	size_t				data_sz;
253 	vio_mblk_t			*vmp;
254 	vio_mblk_t			**rxdp_to_vmp;
255 	vnet_rx_dringdata_desc_t	*rxdp;
256 	vnet_rx_dringdata_desc_t	*pub_addr;
257 	vsw_t				*vswp = ldcp->ldc_vswp;
258 	uint32_t			ncookies = 0;
259 	static char			*name = "vsw_setup_rx_dring";
260 	void				*data_addr = NULL;
261 
262 	/*
263 	 * Allocate a single large buffer that serves as the rx buffer area.
264 	 * We allocate a ldc memory handle and export the buffer area as shared
265 	 * memory. We send the ldc memcookie for this buffer space to the peer,
266 	 * as part of dring registration phase during handshake. We manage this
267 	 * buffer area as individual buffers of max_frame_size and provide
268 	 * specific buffer offsets in each descriptor to the peer. Note that
269 	 * the factor used to compute the # of buffers (above) must be > 1 to
270 	 * ensure that there are more buffers than the # of descriptors. This
271 	 * is needed because, while the shared memory buffers are sent up our
272 	 * stack during receive, the sender needs additional buffers that can
273 	 * be used for further transmits. This also means there is no one to
274 	 * one correspondence between the descriptor index and buffer offset.
275 	 * The sender has to read the buffer offset in the descriptor and use
276 	 * the specified offset to copy the tx data into the shared buffer. We
277 	 * (receiver) manage the individual buffers and their state (see
278 	 * VIO_MBLK_STATEs in vio_util.h).
279 	 */
280 	data_sz = vswp->max_frame_size + VNET_IPALIGN + VNET_LDCALIGN;
281 	data_sz = VNET_ROUNDUP_2K(data_sz);
282 
283 	dp->desc_data_sz = data_sz;
284 	dp->data_sz = (dp->num_bufs * data_sz);
285 	data_addr = kmem_zalloc(dp->data_sz, KM_SLEEP);
286 	dp->data_addr = data_addr;
287 
288 	D2(vswp, "%s: allocated %lld bytes at 0x%llx\n", name,
289 	    dp->data_sz, dp->data_addr);
290 
291 	/* Allocate a ldc memhandle for the entire rx data area */
292 	rv = ldc_mem_alloc_handle(ldcp->ldc_handle, &dp->data_handle);
293 	if (rv != 0) {
294 		DERR(vswp, "%s: alloc mem handle failed", name);
295 		goto fail;
296 	}
297 
298 	/* Allocate memory for the data cookies */
299 	dp->data_cookie = kmem_zalloc(VNET_DATA_AREA_COOKIES *
300 	    sizeof (ldc_mem_cookie_t), KM_SLEEP);
301 
302 	/*
303 	 * Bind ldc memhandle to the corresponding rx data area.
304 	 */
305 	rv = ldc_mem_bind_handle(dp->data_handle, (caddr_t)data_addr,
306 	    dp->data_sz, LDC_DIRECT_MAP, LDC_MEM_W,
307 	    dp->data_cookie, &ncookies);
308 	if (rv != 0) {
309 		DERR(vswp, "%s(%lld): ldc_mem_bind_handle failed "
310 		    "(rv %d)", name, ldcp->ldc_id, rv);
311 		goto fail;
312 	}
313 	if ((ncookies == 0) || (ncookies > VNET_DATA_AREA_COOKIES)) {
314 		goto fail;
315 	}
316 	dp->data_ncookies = ncookies;
317 
318 	/*
319 	 * Successful in binding the handle to rx data area. Now setup mblks
320 	 * around each data buffer and setup the descriptors to point to these
321 	 * rx data buffers. We associate each descriptor with a buffer
322 	 * by specifying the buffer offset in the descriptor. When the peer
323 	 * needs to transmit data, this offset is read by the peer to determine
324 	 * the buffer in the mapped buffer area where the data to be
325 	 * transmitted should be copied, for a specific descriptor.
326 	 */
327 	rv = vio_create_mblks(dp->num_bufs, data_sz, (uint8_t *)data_addr,
328 	    &dp->rx_vmp);
329 	if (rv != 0) {
330 		goto fail;
331 	}
332 
333 	pub_addr = dp->pub_addr;
334 	rxdp_to_vmp = dp->rxdp_to_vmp;
335 	for (i = 0; i < dp->num_descriptors; i++) {
336 		rxdp = &pub_addr[i];
337 		/* allocate an mblk around this data buffer */
338 		vmp = vio_allocb(dp->rx_vmp);
339 		ASSERT(vmp != NULL);
340 		rxdp->data_buf_offset = VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN;
341 		rxdp->dstate = VIO_DESC_FREE;
342 		rxdp_to_vmp[i] = vmp;
343 	}
344 
345 	return (0);
346 
347 fail:
348 	/* return failure; caller will cleanup */
349 	return (1);
350 }
351 
352 /*
353  * Free receive resources for the channel.
354  */
355 void
356 vsw_destroy_rx_dring(vsw_ldc_t *ldcp)
357 {
358 	vsw_t		*vswp = ldcp->ldc_vswp;
359 	lane_t		*lp = &ldcp->lane_out;
360 	dring_info_t	*dp;
361 
362 	dp = lp->dringp;
363 	if (dp == NULL) {
364 		return;
365 	}
366 
367 	mutex_enter(&dp->dlock);
368 
369 	if (dp->rx_vmp != NULL) {
370 		vio_clobber_pool(dp->rx_vmp);
371 		/*
372 		 * If we can't destroy the rx pool for this channel, dispatch a
373 		 * task to retry and clean up those rx pools. Note that we
374 		 * don't need to wait for the task to complete. If the vsw
375 		 * device itself gets detached (vsw_detach()), it will wait for
376 		 * the task to complete implicitly in ddi_taskq_destroy().
377 		 */
378 		if (vio_destroy_mblks(dp->rx_vmp) != 0)  {
379 			(void) ddi_taskq_dispatch(vswp->rxp_taskq,
380 			    vsw_destroy_rxpools, dp->rx_vmp, DDI_SLEEP);
381 		}
382 	}
383 
384 	/* Free rx data area cookies */
385 	if (dp->data_cookie != NULL) {
386 		kmem_free(dp->data_cookie, VNET_DATA_AREA_COOKIES *
387 		    sizeof (ldc_mem_cookie_t));
388 		dp->data_cookie = NULL;
389 	}
390 
391 	/* Unbind rx data area memhandle */
392 	if (dp->data_ncookies != 0) {
393 		(void) ldc_mem_unbind_handle(dp->data_handle);
394 		dp->data_ncookies = 0;
395 	}
396 
397 	/* Free rx data area memhandle */
398 	if (dp->data_handle) {
399 		(void) ldc_mem_free_handle(dp->data_handle);
400 		dp->data_handle = 0;
401 	}
402 
403 	/* Now free the rx data area itself */
404 	if (dp->data_addr != NULL) {
405 		kmem_free(dp->data_addr, dp->data_sz);
406 	}
407 
408 	/* Finally, free the receive descriptor ring */
409 	if (dp->dring_handle != NULL) {
410 		(void) ldc_mem_dring_unbind(dp->dring_handle);
411 		(void) ldc_mem_dring_destroy(dp->dring_handle);
412 	}
413 
414 	if (dp->rxdp_to_vmp != NULL) {
415 		kmem_free(dp->rxdp_to_vmp,
416 		    dp->num_descriptors * sizeof (uintptr_t));
417 		dp->rxdp_to_vmp = NULL;
418 	}
419 
420 	mutex_exit(&dp->dlock);
421 	mutex_destroy(&dp->dlock);
422 	mutex_destroy(&dp->restart_lock);
423 	kmem_free(dp, sizeof (dring_info_t));
424 	lp->dringp = NULL;
425 }
426 
427 /*
428  * Map the receive descriptor ring exported by the peer, as our transmit
429  * descriptor ring.
430  */
431 dring_info_t *
432 vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt)
433 {
434 	int				i;
435 	int				rv;
436 	dring_info_t			*dp;
437 	vnet_rx_dringdata_desc_t	*txdp;
438 	on_trap_data_t			otd;
439 	vio_dring_reg_msg_t		*dring_pkt = pkt;
440 
441 	dp = vsw_map_dring_cmn(ldcp, dring_pkt);
442 	if (dp == NULL) {
443 		return (NULL);
444 	}
445 
446 	/* RxDringData mode specific initializations */
447 	mutex_init(&dp->txlock, NULL, MUTEX_DRIVER, NULL);
448 	mutex_init(&dp->restart_lock, NULL, MUTEX_DRIVER, NULL);
449 	dp->next_txi = 0;
450 	dp->restart_reqd = B_TRUE;
451 	ldcp->dringdata_msgid = 0;
452 	ldcp->lane_in.dringp = dp;
453 
454 	/*
455 	 * Mark the descriptor state as 'done'. This is implementation specific
456 	 * and not required by the protocol. In our implementation, we only
457 	 * need the descripor to be in 'done' state to be used by the transmit
458 	 * function and the peer is not aware of it. As the protocol requires
459 	 * that during initial registration the exporting end point mark the
460 	 * dstate as 'free', we change it 'done' here. After this, the dstate
461 	 * in our implementation will keep moving between 'ready', set by our
462 	 * transmit function; and and 'done', set by the peer (per protocol)
463 	 * after receiving data.
464 	 * Setup on_trap() protection before accessing dring shared memory area.
465 	 */
466 	rv = LDC_ON_TRAP(&otd);
467 	if (rv != 0) {
468 		/*
469 		 * Data access fault occured down the code path below while
470 		 * accessing the descriptors. Return failure.
471 		 */
472 		goto fail;
473 	}
474 
475 	txdp = (vnet_rx_dringdata_desc_t *)dp->pub_addr;
476 	for (i = 0; i < dp->num_descriptors; i++) {
477 		txdp[i].dstate = VIO_DESC_DONE;
478 	}
479 
480 	(void) LDC_NO_TRAP();
481 
482 	return (dp);
483 
484 fail:
485 	if (dp->dring_handle != NULL) {
486 		(void) ldc_mem_dring_unmap(dp->dring_handle);
487 	}
488 	kmem_free(dp, sizeof (*dp));
489 	return (NULL);
490 }
491 
492 /*
493  * Unmap the transmit descriptor ring.
494  */
495 void
496 vsw_unmap_tx_dring(vsw_ldc_t *ldcp)
497 {
498 	lane_t		*lp = &ldcp->lane_in;
499 	dring_info_t	*dp;
500 
501 	if ((dp = lp->dringp) == NULL) {
502 		return;
503 	}
504 
505 	/* Unmap tx data area and free data handle */
506 	if (dp->data_handle != NULL) {
507 		(void) ldc_mem_unmap(dp->data_handle);
508 		(void) ldc_mem_free_handle(dp->data_handle);
509 		dp->data_handle = NULL;
510 	}
511 
512 	/* Free tx data area cookies */
513 	if (dp->data_cookie != NULL) {
514 		kmem_free(dp->data_cookie, dp->data_ncookies *
515 		    sizeof (ldc_mem_cookie_t));
516 		dp->data_cookie = NULL;
517 		dp->data_ncookies = 0;
518 	}
519 
520 	/* Unmap peer's dring */
521 	if (dp->dring_handle != NULL) {
522 		(void) ldc_mem_dring_unmap(dp->dring_handle);
523 		dp->dring_handle = NULL;
524 	}
525 
526 	mutex_destroy(&dp->txlock);
527 	kmem_free(dp, sizeof (dring_info_t));
528 	lp->dringp = NULL;
529 }
530 
531 /*
532  * A per LDC worker thread to process the rx dring and receive packets. This
533  * thread is woken up by the LDC interrupt handler when a dring data info
534  * message is received.
535  */
536 void
537 vsw_ldc_rcv_worker(void *arg)
538 {
539 	callb_cpr_t	cprinfo;
540 	vsw_ldc_t	*ldcp = (vsw_ldc_t *)arg;
541 	vsw_t		*vswp = ldcp->ldc_vswp;
542 
543 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
544 	CALLB_CPR_INIT(&cprinfo, &ldcp->rcv_thr_lock, callb_generic_cpr,
545 	    "vsw_rcv_thread");
546 	mutex_enter(&ldcp->rcv_thr_lock);
547 	while (!(ldcp->rcv_thr_flags & VSW_WTHR_STOP)) {
548 
549 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
550 		/*
551 		 * Wait until the data is received or a stop
552 		 * request is received.
553 		 */
554 		while (!(ldcp->rcv_thr_flags &
555 		    (VSW_WTHR_DATARCVD | VSW_WTHR_STOP))) {
556 			cv_wait(&ldcp->rcv_thr_cv, &ldcp->rcv_thr_lock);
557 		}
558 		CALLB_CPR_SAFE_END(&cprinfo, &ldcp->rcv_thr_lock)
559 
560 		/*
561 		 * First process the stop request.
562 		 */
563 		if (ldcp->rcv_thr_flags & VSW_WTHR_STOP) {
564 			D2(vswp, "%s(%lld):Rx thread stopped\n",
565 			    __func__, ldcp->ldc_id);
566 			break;
567 		}
568 		ldcp->rcv_thr_flags &= ~VSW_WTHR_DATARCVD;
569 		mutex_exit(&ldcp->rcv_thr_lock);
570 		D1(vswp, "%s(%lld):calling vsw_process_pkt\n",
571 		    __func__, ldcp->ldc_id);
572 		vsw_ldc_rcv_shm(ldcp);
573 		mutex_enter(&ldcp->rcv_thr_lock);
574 	}
575 
576 	/*
577 	 * Update the run status and wakeup the thread that
578 	 * has sent the stop request.
579 	 */
580 	ldcp->rcv_thr_flags &= ~VSW_WTHR_STOP;
581 	ldcp->rcv_thread = NULL;
582 	CALLB_CPR_EXIT(&cprinfo);
583 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
584 	thread_exit();
585 }
586 
587 /*
588  * Process the rx descriptor ring in the context of receive worker
589  * thread and switch the received packets to their destinations.
590  */
591 static void
592 vsw_ldc_rcv_shm(vsw_ldc_t *ldcp)
593 {
594 	int		rv;
595 	uint32_t	end_ix;
596 	vio_dring_msg_t msg;
597 	vio_dring_msg_t	*msgp = &msg;
598 	int		count = 0;
599 	int		total_count = 0;
600 	uint32_t	retries = 0;
601 	mblk_t		*bp = NULL;
602 	mblk_t		*bpt = NULL;
603 	mblk_t		*mp = NULL;
604 	vsw_t		*vswp = ldcp->ldc_vswp;
605 	lane_t		*lp = &ldcp->lane_out;
606 	dring_info_t	*dp = lp->dringp;
607 
608 	do {
609 again:
610 		rv = vsw_receive_packet(ldcp, &mp);
611 		if (rv != 0) {
612 			if (rv == EINVAL) {
613 				/* Invalid descriptor error; get next */
614 				continue;
615 			}
616 			if (rv != EAGAIN) {
617 				break;
618 			}
619 
620 			/* Descriptor not ready for processsing */
621 			if (retries == vsw_recv_retries) {
622 				DTRACE_PROBE1(vsw_noready_rxds,
623 				    vsw_ldc_t *, ldcp);
624 				break;
625 			}
626 
627 			/* Switch packets received so far before retrying */
628 			if (bp != NULL) {
629 				VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
630 				    total_count);
631 			}
632 			retries++;
633 			drv_usecwait(vsw_recv_delay);
634 			goto again;
635 		}
636 		retries = 0;
637 
638 		/* Build a chain of received packets */
639 		if (bp == NULL) {
640 			/* first pkt */
641 			bp = mp;
642 			bpt = bp;
643 			bpt->b_next = NULL;
644 		} else {
645 			mp->b_next = NULL;
646 			bpt->b_next = mp;
647 			bpt = mp;
648 		}
649 
650 		total_count++;
651 		count++;
652 
653 		/*
654 		 * If we have gathered vsw_chain_len (tunable)
655 		 * # of packets in the chain, switch them.
656 		 */
657 		if (count == vsw_chain_len) {
658 			VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
659 			    total_count);
660 		}
661 
662 		/*
663 		 * Stop further processing if we processed the entire dring
664 		 * once; otherwise continue.
665 		 */
666 	} while (total_count < dp->num_bufs);
667 
668 	DTRACE_PROBE2(vsw_rx_total_count, vsw_ldc_t *, ldcp,
669 	    int, (total_count));
670 	if (bp != NULL) {
671 		VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
672 		    total_count);
673 	}
674 
675 	/* Send stopped signal to peer (sender) */
676 	end_ix = lp->dringp->next_rxi;
677 	DECR_RXI(dp, end_ix);
678 	msgp->tag.vio_msgtype = VIO_TYPE_DATA;
679 	msgp->tag.vio_subtype = VIO_SUBTYPE_ACK;
680 	msgp->tag.vio_subtype_env = VIO_DRING_DATA;
681 	msgp->dring_ident = ldcp->lane_in.dringp->ident;
682 	msgp->tag.vio_sid = ldcp->local_session;
683 	msgp->dring_process_state = VIO_DP_STOPPED;
684 	msgp->start_idx = VNET_START_IDX_UNSPEC;
685 	msgp->end_idx = end_ix;
686 
687 	(void) vsw_send_msg_shm(ldcp, (void *)msgp,
688 	    sizeof (vio_dring_msg_t), B_TRUE);
689 
690 	ldcp->ldc_stats.dring_data_acks_sent++;
691 	ldcp->ldc_stats.dring_stopped_acks_sent++;
692 }
693 
694 /*
695  * Process the next index in the rx dring and receive the associated packet.
696  *
697  * Returns:
698  *	bp:	Success: The received packet.
699  *		Failure: NULL
700  *      retval:
701  *		Success: 0
702  *		Failure: EAGAIN: Descriptor not ready
703  *			 EIO:    Descriptor contents invalid.
704  */
705 static int
706 vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp)
707 {
708 	uint32_t			rxi;
709 	vio_mblk_t			*vmp;
710 	vio_mblk_t			*new_vmp;
711 	struct ether_header		*ehp;
712 	vnet_rx_dringdata_desc_t	*rxdp;
713 	int				err = 0;
714 	uint_t				nbytes = 0;
715 	mblk_t				*mp = NULL;
716 	mblk_t				*dmp = NULL;
717 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
718 	dring_info_t			*dp = ldcp->lane_out.dringp;
719 	vnet_rx_dringdata_desc_t	*pub_addr = dp->pub_addr;
720 
721 	rxi = dp->next_rxi;
722 	rxdp = &(pub_addr[rxi]);
723 	vmp = dp->rxdp_to_vmp[rxi];
724 
725 	if (rxdp->dstate != VIO_DESC_READY) {
726 		/*
727 		 * Descriptor is not ready.
728 		 */
729 		return (EAGAIN);
730 	}
731 
732 	/*
733 	 * Ensure load ordering of dstate and nbytes.
734 	 */
735 	MEMBAR_CONSUMER();
736 
737 	if ((rxdp->nbytes < ETHERMIN) ||
738 	    (rxdp->nbytes > ldcp->lane_in.mtu) ||
739 	    (rxdp->data_buf_offset !=
740 	    (VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN))) {
741 		/*
742 		 * Descriptor contents invalid.
743 		 */
744 		statsp->ierrors++;
745 		rxdp->dstate = VIO_DESC_DONE;
746 		err = EIO;
747 		goto done;
748 	}
749 
750 	/*
751 	 * Now allocate a new buffer for this descriptor before sending up the
752 	 * buffer being processed. If that fails, stop processing; as we are
753 	 * out of receive buffers.
754 	 */
755 	new_vmp = vio_allocb(dp->rx_vmp);
756 
757 	/*
758 	 * Process the current buffer being received.
759 	 */
760 	nbytes = rxdp->nbytes;
761 	mp = vmp->mp;
762 
763 	if (new_vmp == NULL) {
764 		/*
765 		 * We failed to get a new mapped buffer that is needed to
766 		 * refill the descriptor. In that case, leave the current
767 		 * buffer bound to the descriptor; allocate an mblk dynamically
768 		 * and copy the contents of the buffer to the mblk. Then send
769 		 * up this mblk. This way the sender has the same buffer as
770 		 * before that can be used to send new data.
771 		 */
772 		statsp->norcvbuf++;
773 		dmp = allocb(nbytes + VNET_IPALIGN, BPRI_MED);
774 		bcopy(mp->b_rptr + VNET_IPALIGN,
775 		    dmp->b_rptr + VNET_IPALIGN, nbytes);
776 		mp = dmp;
777 	} else {
778 		/* Mark the status of the current rbuf */
779 		vmp->state = VIO_MBLK_HAS_DATA;
780 
781 		/* Set the offset of the new buffer in the descriptor */
782 		rxdp->data_buf_offset =
783 		    VIO_MBLK_DATA_OFF(new_vmp) + VNET_IPALIGN;
784 		dp->rxdp_to_vmp[rxi] = new_vmp;
785 	}
786 	mp->b_rptr += VNET_IPALIGN;
787 	mp->b_wptr = mp->b_rptr + nbytes;
788 
789 	/*
790 	 * Ensure store ordering of data_buf_offset and dstate; so that the
791 	 * peer sees the right data_buf_offset after it checks that the dstate
792 	 * is DONE.
793 	 */
794 	MEMBAR_PRODUCER();
795 
796 	/* Now mark the descriptor 'done' */
797 	rxdp->dstate = VIO_DESC_DONE;
798 
799 	/* Update stats */
800 	statsp->ipackets++;
801 	statsp->rbytes += rxdp->nbytes;
802 	ehp = (struct ether_header *)mp->b_rptr;
803 	if (IS_BROADCAST(ehp))
804 		statsp->brdcstrcv++;
805 	else if (IS_MULTICAST(ehp))
806 		statsp->multircv++;
807 done:
808 	/* Update the next index to be processed */
809 	INCR_RXI(dp, rxi);
810 
811 	/* Save the new recv index */
812 	dp->next_rxi = rxi;
813 
814 	/* Return the packet received */
815 	*bp = mp;
816 	return (err);
817 }
818 
819 void
820 vsw_stop_rcv_thread(vsw_ldc_t *ldcp)
821 {
822 	kt_did_t	tid = 0;
823 	vsw_t		*vswp = ldcp->ldc_vswp;
824 
825 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
826 	/*
827 	 * Send a stop request by setting the stop flag and
828 	 * wait until the rcv process thread stops.
829 	 */
830 	mutex_enter(&ldcp->rcv_thr_lock);
831 	if (ldcp->rcv_thread != NULL) {
832 		tid = ldcp->rcv_thread->t_did;
833 		ldcp->rcv_thr_flags |= VSW_WTHR_STOP;
834 		cv_signal(&ldcp->rcv_thr_cv);
835 	}
836 	mutex_exit(&ldcp->rcv_thr_lock);
837 
838 	if (tid != 0) {
839 		thread_join(tid);
840 	}
841 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
842 }
843 
844 int
845 vsw_dringsend_shm(vsw_ldc_t *ldcp, mblk_t *mp)
846 {
847 	uint32_t			next_txi;
848 	uint32_t			txi;
849 	vnet_rx_dringdata_desc_t	*txdp;
850 	vnet_rx_dringdata_desc_t	*ntxdp;
851 	struct ether_header		*ehp;
852 	size_t				mblksz;
853 	caddr_t				dst;
854 	mblk_t				*bp;
855 	size_t				size;
856 	on_trap_data_t			otd;
857 	uint32_t			buf_offset;
858 	vnet_rx_dringdata_desc_t	*pub_addr;
859 	vio_dring_msg_t			msg;
860 	vio_dring_msg_t			*msgp = &msg;
861 	int				rv = 0;
862 	boolean_t			resched_peer = B_FALSE;
863 	boolean_t			is_bcast = B_FALSE;
864 	boolean_t			is_mcast = B_FALSE;
865 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
866 	lane_t				*lane_in = &ldcp->lane_in;
867 	lane_t				*lane_out = &ldcp->lane_out;
868 	dring_info_t			*dp = lane_in->dringp;
869 	vsw_t				*vswp = ldcp->ldc_vswp;
870 
871 	if ((!(lane_in->lstate & VSW_LANE_ACTIVE)) ||
872 	    (ldcp->ldc_status != LDC_UP) || (ldcp->ldc_handle == NULL)) {
873 		DWARN(vswp, "%s(%lld) status(%d) lstate(0x%llx), dropping "
874 		    "packet\n", __func__, ldcp->ldc_id, ldcp->ldc_status,
875 		    lane_in->lstate);
876 		statsp->oerrors++;
877 		return (LDC_TX_FAILURE);
878 	}
879 
880 	if (dp == NULL) {
881 		DERR(vswp, "%s(%lld): no dring for outbound lane on"
882 		    " channel %d", __func__, ldcp->ldc_id, ldcp->ldc_id);
883 		statsp->oerrors++;
884 		return (LDC_TX_FAILURE);
885 	}
886 	pub_addr = dp->pub_addr;
887 
888 	size = msgsize(mp);
889 
890 	/*
891 	 * Note: In RxDringData mode, lane_in is associated with transmit and
892 	 * lane_out is associated with receive. However, we still keep the
893 	 * negotiated mtu in lane_out (our exported attributes).
894 	 */
895 	if (size > (size_t)lane_out->mtu) {
896 		DERR(vswp, "%s(%lld) invalid size (%ld)\n", __func__,
897 		    ldcp->ldc_id, size);
898 		statsp->oerrors++;
899 		return (LDC_TX_FAILURE);
900 	}
901 
902 	if (size < ETHERMIN)
903 		size = ETHERMIN;
904 
905 	ehp = (struct ether_header *)mp->b_rptr;
906 	is_bcast = IS_BROADCAST(ehp);
907 	is_mcast = IS_MULTICAST(ehp);
908 
909 	/*
910 	 * Setup on_trap() protection before accessing shared memory areas
911 	 * (descriptor and data buffer). Note that we enable this protection a
912 	 * little early and turn it off slightly later, than keeping it enabled
913 	 * strictly at the points in code below where the descriptor and data
914 	 * buffer are accessed. This is done for performance reasons:
915 	 * (a) to avoid calling the trap protection code while holding mutex.
916 	 * (b) to avoid multiple on/off steps for descriptor and data accesses.
917 	 */
918 	rv = LDC_ON_TRAP(&otd);
919 	if (rv != 0) {
920 		/*
921 		 * Data access fault occured down the code path below while
922 		 * accessing either the descriptor or the data buffer. Release
923 		 * any locks that we might have acquired in the code below and
924 		 * return failure.
925 		 */
926 		DERR(vswp, "%s(%lld) data access fault occured\n",
927 		    __func__, ldcp->ldc_id);
928 		statsp->oerrors++;
929 		if (mutex_owned(&dp->txlock)) {
930 			mutex_exit(&dp->txlock);
931 		}
932 		if (mutex_owned(&dp->restart_lock)) {
933 			mutex_exit(&dp->restart_lock);
934 		}
935 		goto dringsend_shm_exit;
936 	}
937 
938 	/*
939 	 * Allocate a descriptor
940 	 */
941 	mutex_enter(&dp->txlock);
942 	txi = next_txi = dp->next_txi;
943 	INCR_TXI(dp, next_txi);
944 	ntxdp = &(pub_addr[next_txi]);
945 	if (ntxdp->dstate != VIO_DESC_DONE) { /* out of descriptors */
946 		statsp->tx_no_desc++;
947 		mutex_exit(&dp->txlock);
948 		(void) LDC_NO_TRAP();
949 		return (LDC_TX_NORESOURCES);
950 	}
951 
952 	/* Update descriptor ring index */
953 	dp->next_txi = next_txi;
954 	mutex_exit(&dp->txlock);
955 
956 	/* Access the descriptor */
957 	txdp = &(pub_addr[txi]);
958 
959 	/* Ensure load ordering of dstate (above) and data_buf_offset. */
960 	MEMBAR_CONSUMER();
961 
962 	/* Get the offset of the buffer to be used */
963 	buf_offset = txdp->data_buf_offset;
964 
965 	/* Access the buffer using the offset */
966 	dst = (caddr_t)dp->data_addr + buf_offset;
967 
968 	/* Copy data into mapped transmit buffer */
969 	for (bp = mp; bp != NULL; bp = bp->b_cont) {
970 		mblksz = MBLKL(bp);
971 		bcopy(bp->b_rptr, dst, mblksz);
972 		dst += mblksz;
973 	}
974 
975 	/* Set the size of data in the descriptor */
976 	txdp->nbytes = size;
977 
978 	/*
979 	 * Ensure store ordering of nbytes and dstate (below); so that the peer
980 	 * sees the right nbytes value after it checks that the dstate is READY.
981 	 */
982 	MEMBAR_PRODUCER();
983 
984 	mutex_enter(&dp->restart_lock);
985 
986 	/* Mark the descriptor ready */
987 	txdp->dstate = VIO_DESC_READY;
988 
989 	/* Check if peer needs wake up (handled below) */
990 	if (dp->restart_reqd == B_TRUE) {
991 		dp->restart_reqd = B_FALSE;
992 		resched_peer = B_TRUE;
993 	}
994 
995 	/* Update tx stats */
996 	statsp->opackets++;
997 	statsp->obytes += size;
998 	if (is_bcast)
999 		statsp->brdcstxmt++;
1000 	else if (is_mcast)
1001 		statsp->multixmt++;
1002 
1003 	mutex_exit(&dp->restart_lock);
1004 
1005 	/*
1006 	 * We are done accessing shared memory; clear trap protection.
1007 	 */
1008 	(void) LDC_NO_TRAP();
1009 
1010 	/*
1011 	 * Need to wake up the peer ?
1012 	 */
1013 	if (resched_peer == B_TRUE) {
1014 		msgp->tag.vio_msgtype = VIO_TYPE_DATA;
1015 		msgp->tag.vio_subtype = VIO_SUBTYPE_INFO;
1016 		msgp->tag.vio_subtype_env = VIO_DRING_DATA;
1017 		msgp->tag.vio_sid = ldcp->local_session;
1018 		msgp->dring_ident = lane_out->dringp->ident;
1019 		msgp->start_idx = txi;
1020 		msgp->end_idx = -1;
1021 
1022 		rv = vsw_send_msg_shm(ldcp, (void *)msgp, sizeof (*msgp),
1023 		    B_FALSE);
1024 		if (rv != 0) {
1025 			/* error: drop the packet */
1026 			DERR(vswp, "%s(%lld) failed sending dringdata msg\n",
1027 			    __func__, ldcp->ldc_id);
1028 			mutex_enter(&dp->restart_lock);
1029 			statsp->oerrors++;
1030 			dp->restart_reqd = B_TRUE;
1031 			mutex_exit(&dp->restart_lock);
1032 		}
1033 		statsp->dring_data_msgs_sent++;
1034 	}
1035 
1036 dringsend_shm_exit:
1037 	if (rv == ECONNRESET || rv == EACCES) {
1038 		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1039 	}
1040 	return (LDC_TX_SUCCESS);
1041 }
1042 
1043 void
1044 vsw_process_dringdata_shm(void *arg, void *dpkt)
1045 {
1046 	vsw_ldc_t		*ldcp = arg;
1047 	vsw_t			*vswp = ldcp->ldc_vswp;
1048 	vio_dring_msg_t		*dring_pkt = dpkt;
1049 
1050 	switch (dring_pkt->tag.vio_subtype) {
1051 	case VIO_SUBTYPE_INFO:
1052 		D2(vswp, "%s(%lld): VIO_SUBTYPE_INFO", __func__, ldcp->ldc_id);
1053 		vsw_process_dringdata_info_shm(ldcp, dring_pkt);
1054 		break;
1055 
1056 	case VIO_SUBTYPE_ACK:
1057 		D2(vswp, "%s(%lld): VIO_SUBTYPE_ACK", __func__, ldcp->ldc_id);
1058 		vsw_process_dringdata_ack_shm(ldcp, dring_pkt);
1059 		break;
1060 
1061 	case VIO_SUBTYPE_NACK:
1062 		DWARN(vswp, "%s(%lld): VIO_SUBTYPE_NACK",
1063 		    __func__, ldcp->ldc_id);
1064 		/*
1065 		 * Something is badly wrong if we are getting NACK's
1066 		 * for our data pkts. So reset the channel.
1067 		 */
1068 		vsw_process_conn_evt(ldcp, VSW_CONN_RESTART);
1069 		break;
1070 
1071 	default:
1072 		DERR(vswp, "%s(%lld): Unknown vio_subtype %x\n", __func__,
1073 		    ldcp->ldc_id, dring_pkt->tag.vio_subtype);
1074 	}
1075 }
1076 
1077 static void
1078 vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1079 {
1080 	dring_info_t	*dp = ldcp->lane_in.dringp;
1081 	vsw_t		*vswp = ldcp->ldc_vswp;
1082 	vgen_stats_t	*statsp = &ldcp->ldc_stats;
1083 
1084 	if (dp->ident != msg->dring_ident) {
1085 		/* drop the message */
1086 		DERR(vswp, "%s(%lld): Invalid dring ident 0x%llx",
1087 		    __func__, ldcp->ldc_id, msg->dring_ident);
1088 		return;
1089 	}
1090 
1091 	statsp->dring_data_msgs_rcvd++;
1092 
1093 	/*
1094 	 * Wake up the rcv worker thread to process the rx dring.
1095 	 */
1096 	ASSERT(MUTEX_HELD(&ldcp->ldc_cblock));
1097 	mutex_exit(&ldcp->ldc_cblock);
1098 	mutex_enter(&ldcp->rcv_thr_lock);
1099 	if (!(ldcp->rcv_thr_flags & VSW_WTHR_DATARCVD)) {
1100 		ldcp->rcv_thr_flags |= VSW_WTHR_DATARCVD;
1101 		cv_signal(&ldcp->rcv_thr_cv);
1102 	}
1103 	mutex_exit(&ldcp->rcv_thr_lock);
1104 	mutex_enter(&ldcp->ldc_cblock);
1105 }
1106 
1107 static void
1108 vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1109 {
1110 	dring_info_t			*dp;
1111 	uint32_t			start;
1112 	int32_t				end;
1113 	int				rv;
1114 	on_trap_data_t			otd;
1115 	uint32_t			txi;
1116 	vnet_rx_dringdata_desc_t	*txdp;
1117 	vnet_rx_dringdata_desc_t	*pub_addr;
1118 	boolean_t			ready_txd = B_FALSE;
1119 	vsw_t				*vswp = ldcp->ldc_vswp;
1120 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
1121 
1122 	dp = ldcp->lane_in.dringp;
1123 	start = msg->start_idx;
1124 	end = msg->end_idx;
1125 	pub_addr = dp->pub_addr;
1126 
1127 	/*
1128 	 * In RxDringData mode (v1.6), start index of -1 can be used by the
1129 	 * peer to indicate that it is unspecified. However, the end index
1130 	 * must be set correctly indicating the last descriptor index processed.
1131 	 */
1132 	if (((start != VNET_START_IDX_UNSPEC) && !(CHECK_TXI(dp, start))) ||
1133 	    !(CHECK_TXI(dp, end))) {
1134 		/* drop the message if invalid index */
1135 		DWARN(vswp, "%s(%lld): Invalid Tx ack start(%d) or end(%d)\n",
1136 		    __func__, ldcp->ldc_id, start, end);
1137 		return;
1138 	}
1139 
1140 	/* Validate dring_ident */
1141 	if (msg->dring_ident != ldcp->lane_out.dringp->ident) {
1142 		/* invalid dring_ident, drop the msg */
1143 		DWARN(vswp, "%s(%lld): Invalid dring ident 0x%x\n",
1144 		    __func__, ldcp->ldc_id, msg->dring_ident);
1145 		return;
1146 	}
1147 	statsp->dring_data_acks_rcvd++;
1148 
1149 	if (msg->dring_process_state != VIO_DP_STOPPED) {
1150 		/*
1151 		 * Receiver continued processing
1152 		 * dring after sending us the ack.
1153 		 */
1154 		return;
1155 	}
1156 
1157 	statsp->dring_stopped_acks_rcvd++;
1158 
1159 	/*
1160 	 * Setup on_trap() protection before accessing dring shared memory area.
1161 	 */
1162 	rv = LDC_ON_TRAP(&otd);
1163 	if (rv != 0) {
1164 		/*
1165 		 * Data access fault occured down the code path below while
1166 		 * accessing the descriptors. Release any locks that we might
1167 		 * have acquired in the code below and return failure.
1168 		 */
1169 		if (mutex_owned(&dp->restart_lock)) {
1170 			mutex_exit(&dp->restart_lock);
1171 		}
1172 		return;
1173 	}
1174 
1175 	/*
1176 	 * Determine if there are any pending tx descriptors ready to be
1177 	 * processed by the receiver(peer) and if so, send a message to the
1178 	 * peer to restart receiving.
1179 	 */
1180 	mutex_enter(&dp->restart_lock);
1181 
1182 	ready_txd = B_FALSE;
1183 	txi = end;
1184 	INCR_TXI(dp, txi);
1185 	txdp = &pub_addr[txi];
1186 	if (txdp->dstate == VIO_DESC_READY) {
1187 		ready_txd = B_TRUE;
1188 	}
1189 
1190 	/*
1191 	 * We are done accessing shared memory; clear trap protection.
1192 	 */
1193 	(void) LDC_NO_TRAP();
1194 
1195 	if (ready_txd == B_FALSE) {
1196 		/*
1197 		 * No ready tx descriptors. Set the flag to send a message to
1198 		 * the peer when tx descriptors are ready in transmit routine.
1199 		 */
1200 		dp->restart_reqd = B_TRUE;
1201 		mutex_exit(&dp->restart_lock);
1202 		return;
1203 	}
1204 
1205 	/*
1206 	 * We have some tx descriptors ready to be processed by the receiver.
1207 	 * Send a dring data message to the peer to restart processing.
1208 	 */
1209 	dp->restart_reqd = B_FALSE;
1210 	mutex_exit(&dp->restart_lock);
1211 
1212 	msg->tag.vio_msgtype = VIO_TYPE_DATA;
1213 	msg->tag.vio_subtype = VIO_SUBTYPE_INFO;
1214 	msg->tag.vio_subtype_env = VIO_DRING_DATA;
1215 	msg->tag.vio_sid = ldcp->local_session;
1216 	msg->dring_ident = ldcp->lane_out.dringp->ident;
1217 	msg->start_idx = txi;
1218 	msg->end_idx = -1;
1219 	rv = vsw_send_msg_shm(ldcp, (void *)msg,
1220 	    sizeof (vio_dring_msg_t), B_FALSE);
1221 	statsp->dring_data_msgs_sent++;
1222 	if (rv != 0) {
1223 		mutex_enter(&dp->restart_lock);
1224 		dp->restart_reqd = B_TRUE;
1225 		mutex_exit(&dp->restart_lock);
1226 	}
1227 
1228 	if (rv == ECONNRESET) {
1229 		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1230 	}
1231 }
1232 
1233 /*
1234  * Send dring data msgs (info/ack/nack) over LDC.
1235  */
1236 int
1237 vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size, boolean_t handle_reset)
1238 {
1239 	int			rv;
1240 	int			retries = vsw_wretries;
1241 	size_t			msglen = size;
1242 	vsw_t			*vswp = ldcp->ldc_vswp;
1243 	vio_dring_msg_t		*dmsg = (vio_dring_msg_t *)msgp;
1244 
1245 	D1(vswp, "vsw_send_msg (%lld) enter : sending %d bytes",
1246 	    ldcp->ldc_id, size);
1247 
1248 	dmsg->seq_num = atomic_inc_32_nv(&ldcp->dringdata_msgid);
1249 
1250 	do {
1251 		msglen = size;
1252 		rv = ldc_write(ldcp->ldc_handle, (caddr_t)msgp, &msglen);
1253 	} while (rv == EWOULDBLOCK && --retries > 0);
1254 
1255 	if ((rv != 0) || (msglen != size)) {
1256 		DERR(vswp, "vsw_send_msg_shm:ldc_write failed: "
1257 		    "chan(%lld) rv(%d) size (%d) msglen(%d)\n",
1258 		    ldcp->ldc_id, rv, size, msglen);
1259 		ldcp->ldc_stats.oerrors++;
1260 	}
1261 
1262 	/*
1263 	 * If channel has been reset we either handle it here or
1264 	 * simply report back that it has been reset and let caller
1265 	 * decide what to do.
1266 	 */
1267 	if (rv == ECONNRESET) {
1268 		DWARN(vswp, "%s (%lld) channel reset", __func__, ldcp->ldc_id);
1269 
1270 		if (handle_reset) {
1271 			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1272 		}
1273 	}
1274 
1275 	return (rv);
1276 }
1277