xref: /titanic_51/usr/src/uts/sun4v/io/vsw_rxdring.c (revision 65f385795231fc5a4e021acc26afd73365a3b5ab)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 #include <sys/types.h>
27 #include <sys/errno.h>
28 #include <sys/sysmacros.h>
29 #include <sys/param.h>
30 #include <sys/machsystm.h>
31 #include <sys/stream.h>
32 #include <sys/strsubr.h>
33 #include <sys/kmem.h>
34 #include <sys/strsun.h>
35 #include <sys/callb.h>
36 #include <sys/sdt.h>
37 #include <sys/mach_descrip.h>
38 #include <sys/mdeg.h>
39 #include <net/if.h>
40 #include <sys/vsw.h>
41 #include <sys/vio_mailbox.h>
42 #include <sys/vio_common.h>
43 #include <sys/vnet_common.h>
44 #include <sys/vnet_mailbox.h>
45 #include <sys/vio_util.h>
46 
47 /*
48  * This file contains the implementation of RxDringData transfer mode of VIO
49  * Protocol in vsw. The functions in this file are invoked from vsw_ldc.c
50  * after RxDringData mode is negotiated with the peer during attribute phase of
51  * handshake. This file contains functions that setup the transmit and receive
52  * descriptor rings, and associated resources in RxDringData mode. It also
53  * contains the transmit and receive data processing functions that are invoked
54  * in RxDringData mode. The data processing routines in this file have the
55  * suffix '_shm' to indicate the shared memory mechanism used in RxDringData
56  * mode.
57  */
58 
59 /* Functions exported to vsw_ldc.c */
60 vio_dring_reg_msg_t *vsw_create_rx_dring_info(vsw_ldc_t *);
61 void vsw_destroy_rx_dring(vsw_ldc_t *ldcp);
62 dring_info_t *vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt);
63 void vsw_unmap_tx_dring(vsw_ldc_t *ldcp);
64 int vsw_dringsend_shm(vsw_ldc_t *, mblk_t *);
65 void vsw_ldc_rcv_worker(void *arg);
66 void vsw_stop_rcv_thread(vsw_ldc_t *ldcp);
67 void vsw_process_dringdata_shm(void *, void *);
68 
69 /* Internal functions */
70 static dring_info_t *vsw_create_rx_dring(vsw_ldc_t *);
71 static int vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp);
72 static void vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp,
73 	vio_dring_msg_t *msg);
74 static void vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp,
75 	vio_dring_msg_t *msg);
76 static void vsw_ldc_rcv_shm(vsw_ldc_t *ldcp);
77 static int vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp);
78 static int vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size,
79     boolean_t handle_reset);
80 
81 /* Functions imported from vsw_ldc.c */
82 extern void vsw_process_pkt(void *);
83 extern void vsw_destroy_rxpools(void *);
84 extern dring_info_t *vsw_map_dring_cmn(vsw_ldc_t *ldcp,
85     vio_dring_reg_msg_t *dring_pkt);
86 extern void vsw_process_conn_evt(vsw_ldc_t *, uint16_t);
87 extern mblk_t *vsw_vlan_frame_pretag(void *arg, int type, mblk_t *mp);
88 
89 /* Tunables */
90 extern int vsw_wretries;
91 extern int vsw_recv_delay;
92 extern int vsw_recv_retries;
93 extern uint32_t vsw_chain_len;
94 extern uint32_t vsw_num_descriptors;
95 extern uint32_t vsw_nrbufs_factor;
96 
97 #define	VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count, total_count)	\
98 {									\
99 	DTRACE_PROBE2(vsw_rx_pkts, vsw_ldc_t *, (ldcp), int, (count));	\
100 	(vswp)->vsw_switch_frame((vswp), (bp), VSW_VNETPORT,		\
101 	    (ldcp)->ldc_port, NULL);					\
102 	(bp) = (bpt) = NULL;						\
103 	(count) = 0;							\
104 }
105 
106 vio_dring_reg_msg_t *
107 vsw_create_rx_dring_info(vsw_ldc_t *ldcp)
108 {
109 	vio_dring_reg_msg_t	*mp;
110 	vio_dring_reg_ext_msg_t	*emsg;
111 	dring_info_t		*dp;
112 	uint8_t			*buf;
113 	vsw_t			*vswp = ldcp->ldc_vswp;
114 
115 	D1(vswp, "%s enter\n", __func__);
116 
117 	/*
118 	 * If we can't create a dring, obviously no point sending
119 	 * a message.
120 	 */
121 	if ((dp = vsw_create_rx_dring(ldcp)) == NULL)
122 		return (NULL);
123 
124 	mp = kmem_zalloc(VNET_DRING_REG_EXT_MSG_SIZE(dp->data_ncookies),
125 	    KM_SLEEP);
126 
127 	mp->tag.vio_msgtype = VIO_TYPE_CTRL;
128 	mp->tag.vio_subtype = VIO_SUBTYPE_INFO;
129 	mp->tag.vio_subtype_env = VIO_DRING_REG;
130 	mp->tag.vio_sid = ldcp->local_session;
131 
132 	/* payload */
133 	mp->num_descriptors = dp->num_descriptors;
134 	mp->descriptor_size = dp->descriptor_size;
135 	mp->options = dp->options;
136 	mp->ncookies = dp->dring_ncookies;
137 	bcopy(&dp->dring_cookie[0], &mp->cookie[0],
138 	    sizeof (ldc_mem_cookie_t));
139 
140 	mp->dring_ident = 0;
141 
142 	buf = (uint8_t *)mp->cookie;
143 
144 	/* skip over dring cookies */
145 	ASSERT(mp->ncookies == 1);
146 	buf += (mp->ncookies * sizeof (ldc_mem_cookie_t));
147 
148 	emsg = (vio_dring_reg_ext_msg_t *)buf;
149 
150 	/* copy data_ncookies in the msg */
151 	emsg->data_ncookies = dp->data_ncookies;
152 
153 	/* copy data area size in the msg */
154 	emsg->data_area_size = dp->data_sz;
155 
156 	/* copy data area cookies in the msg */
157 	bcopy(dp->data_cookie, (ldc_mem_cookie_t *)emsg->data_cookie,
158 	    sizeof (ldc_mem_cookie_t) * dp->data_ncookies);
159 
160 	D1(vswp, "%s exit\n", __func__);
161 
162 	return (mp);
163 }
164 
165 /*
166  * Allocate receive resources for the channel. The resources consist of a
167  * receive descriptor ring and an associated receive buffer area.
168  */
169 static dring_info_t *
170 vsw_create_rx_dring(vsw_ldc_t *ldcp)
171 {
172 	vsw_t			*vswp = ldcp->ldc_vswp;
173 	ldc_mem_info_t		minfo;
174 	dring_info_t		*dp;
175 
176 	dp = (dring_info_t *)kmem_zalloc(sizeof (dring_info_t), KM_SLEEP);
177 	mutex_init(&dp->dlock, NULL, MUTEX_DRIVER, NULL);
178 	ldcp->lane_out.dringp = dp;
179 
180 	/* Create the receive descriptor ring */
181 	if ((ldc_mem_dring_create(vsw_num_descriptors,
182 	    sizeof (vnet_rx_dringdata_desc_t), &dp->dring_handle)) != 0) {
183 		DERR(vswp, "vsw_create_rx_dring(%lld): ldc dring create "
184 		    "failed", ldcp->ldc_id);
185 		goto fail;
186 	}
187 
188 	ASSERT(dp->dring_handle != NULL);
189 
190 	/* Get the addr of descriptor ring */
191 	if ((ldc_mem_dring_info(dp->dring_handle, &minfo)) != 0) {
192 		DERR(vswp, "vsw_create_rx_dring(%lld): dring info failed\n",
193 		    ldcp->ldc_id);
194 		goto fail;
195 	} else {
196 		ASSERT(minfo.vaddr != 0);
197 		dp->pub_addr = minfo.vaddr;
198 	}
199 
200 	dp->num_descriptors = vsw_num_descriptors;
201 	dp->descriptor_size = sizeof (vnet_rx_dringdata_desc_t);
202 	dp->options = VIO_RX_DRING_DATA;
203 	dp->dring_ncookies = 1;	/* guaranteed by ldc */
204 	dp->num_bufs = VSW_RXDRING_NRBUFS;
205 
206 	/*
207 	 * Allocate a table that maps descriptor to its associated buffer;
208 	 * used while receiving to validate that the peer has not changed the
209 	 * buffer offset provided in the descriptor.
210 	 */
211 	dp->rxdp_to_vmp = kmem_zalloc(dp->num_descriptors * sizeof (uintptr_t),
212 	    KM_SLEEP);
213 
214 	/* Setup the descriptor ring */
215 	if (vsw_setup_rx_dring(ldcp, dp)) {
216 		DERR(vswp, "%s: unable to setup ring", __func__);
217 		goto fail;
218 	}
219 
220 	/*
221 	 * The descriptors and the associated buffers are all ready;
222 	 * now bind descriptor ring to the channel.
223 	 */
224 	if ((ldc_mem_dring_bind(ldcp->ldc_handle, dp->dring_handle,
225 	    LDC_DIRECT_MAP | LDC_SHADOW_MAP, LDC_MEM_RW,
226 	    &dp->dring_cookie[0], &dp->dring_ncookies)) != 0) {
227 		DERR(vswp, "vsw_create_rx_dring: unable to bind to channel "
228 		    "%lld", ldcp->ldc_id);
229 		goto fail;
230 	}
231 
232 	/* haven't used any descriptors yet */
233 	dp->end_idx = 0;
234 	dp->last_ack_recv = -1;
235 	dp->next_rxi = 0;
236 	return (dp);
237 
238 fail:
239 	vsw_destroy_rx_dring(ldcp);
240 	return (NULL);
241 }
242 
243 /*
244  * Setup the descriptors in the rx dring.
245  * Returns 0 on success, 1 on failure.
246  */
247 static int
248 vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp)
249 {
250 	int				i, j;
251 	int				rv;
252 	size_t				data_sz;
253 	vio_mblk_t			*vmp;
254 	vio_mblk_t			**rxdp_to_vmp;
255 	vnet_rx_dringdata_desc_t	*rxdp;
256 	vnet_rx_dringdata_desc_t	*pub_addr;
257 	vsw_t				*vswp = ldcp->ldc_vswp;
258 	uint32_t			ncookies = 0;
259 	static char			*name = "vsw_setup_rx_dring";
260 	void				*data_addr = NULL;
261 
262 	/*
263 	 * Allocate a single large buffer that serves as the rx buffer area.
264 	 * We allocate a ldc memory handle and export the buffer area as shared
265 	 * memory. We send the ldc memcookie for this buffer space to the peer,
266 	 * as part of dring registration phase during handshake. We manage this
267 	 * buffer area as individual buffers of max_frame_size and provide
268 	 * specific buffer offsets in each descriptor to the peer. Note that
269 	 * the factor used to compute the # of buffers (above) must be > 1 to
270 	 * ensure that there are more buffers than the # of descriptors. This
271 	 * is needed because, while the shared memory buffers are sent up our
272 	 * stack during receive, the sender needs additional buffers that can
273 	 * be used for further transmits. This also means there is no one to
274 	 * one correspondence between the descriptor index and buffer offset.
275 	 * The sender has to read the buffer offset in the descriptor and use
276 	 * the specified offset to copy the tx data into the shared buffer. We
277 	 * (receiver) manage the individual buffers and their state (see
278 	 * VIO_MBLK_STATEs in vio_util.h).
279 	 */
280 	data_sz = RXDRING_DBLK_SZ(vswp->max_frame_size);
281 
282 	dp->desc_data_sz = data_sz;
283 	dp->data_sz = (dp->num_bufs * data_sz);
284 	data_addr = kmem_zalloc(dp->data_sz, KM_SLEEP);
285 	dp->data_addr = data_addr;
286 
287 	D2(vswp, "%s: allocated %lld bytes at 0x%llx\n", name,
288 	    dp->data_sz, dp->data_addr);
289 
290 	/* Allocate a ldc memhandle for the entire rx data area */
291 	rv = ldc_mem_alloc_handle(ldcp->ldc_handle, &dp->data_handle);
292 	if (rv != 0) {
293 		DERR(vswp, "%s: alloc mem handle failed", name);
294 		goto fail;
295 	}
296 
297 	/* Allocate memory for the data cookies */
298 	dp->data_cookie = kmem_zalloc(VNET_DATA_AREA_COOKIES *
299 	    sizeof (ldc_mem_cookie_t), KM_SLEEP);
300 
301 	/*
302 	 * Bind ldc memhandle to the corresponding rx data area.
303 	 */
304 	rv = ldc_mem_bind_handle(dp->data_handle, (caddr_t)data_addr,
305 	    dp->data_sz, LDC_DIRECT_MAP, LDC_MEM_W,
306 	    dp->data_cookie, &ncookies);
307 	if (rv != 0) {
308 		DERR(vswp, "%s(%lld): ldc_mem_bind_handle failed "
309 		    "(rv %d)", name, ldcp->ldc_id, rv);
310 		goto fail;
311 	}
312 	if ((ncookies == 0) || (ncookies > VNET_DATA_AREA_COOKIES)) {
313 		goto fail;
314 	}
315 	dp->data_ncookies = ncookies;
316 
317 	for (j = 1; j < ncookies; j++) {
318 		rv = ldc_mem_nextcookie(dp->data_handle,
319 		    &(dp->data_cookie[j]));
320 		if (rv != 0) {
321 			DERR(vswp, "%s: ldc_mem_nextcookie "
322 			    "failed rv (%d)", name, rv);
323 			goto fail;
324 		}
325 	}
326 
327 	/*
328 	 * Successful in binding the handle to rx data area. Now setup mblks
329 	 * around each data buffer and setup the descriptors to point to these
330 	 * rx data buffers. We associate each descriptor with a buffer
331 	 * by specifying the buffer offset in the descriptor. When the peer
332 	 * needs to transmit data, this offset is read by the peer to determine
333 	 * the buffer in the mapped buffer area where the data to be
334 	 * transmitted should be copied, for a specific descriptor.
335 	 */
336 	rv = vio_create_mblks(dp->num_bufs, data_sz, (uint8_t *)data_addr,
337 	    &dp->rx_vmp);
338 	if (rv != 0) {
339 		goto fail;
340 	}
341 
342 	pub_addr = dp->pub_addr;
343 	rxdp_to_vmp = dp->rxdp_to_vmp;
344 	for (i = 0; i < dp->num_descriptors; i++) {
345 		rxdp = &pub_addr[i];
346 		/* allocate an mblk around this data buffer */
347 		vmp = vio_allocb(dp->rx_vmp);
348 		ASSERT(vmp != NULL);
349 		rxdp->data_buf_offset = VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN;
350 		rxdp->dstate = VIO_DESC_FREE;
351 		rxdp_to_vmp[i] = vmp;
352 	}
353 
354 	return (0);
355 
356 fail:
357 	/* return failure; caller will cleanup */
358 	return (1);
359 }
360 
361 /*
362  * Free receive resources for the channel.
363  */
364 void
365 vsw_destroy_rx_dring(vsw_ldc_t *ldcp)
366 {
367 	vsw_t		*vswp = ldcp->ldc_vswp;
368 	lane_t		*lp = &ldcp->lane_out;
369 	dring_info_t	*dp;
370 
371 	dp = lp->dringp;
372 	if (dp == NULL) {
373 		return;
374 	}
375 
376 	mutex_enter(&dp->dlock);
377 
378 	if (dp->rx_vmp != NULL) {
379 		vio_clobber_pool(dp->rx_vmp);
380 		/*
381 		 * If we can't destroy the rx pool for this channel, dispatch a
382 		 * task to retry and clean up those rx pools. Note that we
383 		 * don't need to wait for the task to complete. If the vsw
384 		 * device itself gets detached (vsw_detach()), it will wait for
385 		 * the task to complete implicitly in ddi_taskq_destroy().
386 		 */
387 		if (vio_destroy_mblks(dp->rx_vmp) != 0)  {
388 			(void) ddi_taskq_dispatch(vswp->rxp_taskq,
389 			    vsw_destroy_rxpools, dp->rx_vmp, DDI_SLEEP);
390 		}
391 	}
392 
393 	/* Free rx data area cookies */
394 	if (dp->data_cookie != NULL) {
395 		kmem_free(dp->data_cookie, VNET_DATA_AREA_COOKIES *
396 		    sizeof (ldc_mem_cookie_t));
397 		dp->data_cookie = NULL;
398 	}
399 
400 	/* Unbind rx data area memhandle */
401 	if (dp->data_ncookies != 0) {
402 		(void) ldc_mem_unbind_handle(dp->data_handle);
403 		dp->data_ncookies = 0;
404 	}
405 
406 	/* Free rx data area memhandle */
407 	if (dp->data_handle) {
408 		(void) ldc_mem_free_handle(dp->data_handle);
409 		dp->data_handle = 0;
410 	}
411 
412 	/* Now free the rx data area itself */
413 	if (dp->data_addr != NULL) {
414 		kmem_free(dp->data_addr, dp->data_sz);
415 	}
416 
417 	/* Finally, free the receive descriptor ring */
418 	if (dp->dring_handle != NULL) {
419 		(void) ldc_mem_dring_unbind(dp->dring_handle);
420 		(void) ldc_mem_dring_destroy(dp->dring_handle);
421 	}
422 
423 	if (dp->rxdp_to_vmp != NULL) {
424 		kmem_free(dp->rxdp_to_vmp,
425 		    dp->num_descriptors * sizeof (uintptr_t));
426 		dp->rxdp_to_vmp = NULL;
427 	}
428 
429 	mutex_exit(&dp->dlock);
430 	mutex_destroy(&dp->dlock);
431 	mutex_destroy(&dp->restart_lock);
432 	kmem_free(dp, sizeof (dring_info_t));
433 	lp->dringp = NULL;
434 }
435 
436 /*
437  * Map the receive descriptor ring exported by the peer, as our transmit
438  * descriptor ring.
439  */
440 dring_info_t *
441 vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt)
442 {
443 	int				i;
444 	int				rv;
445 	dring_info_t			*dp;
446 	vnet_rx_dringdata_desc_t	*txdp;
447 	on_trap_data_t			otd;
448 	vio_dring_reg_msg_t		*dring_pkt = pkt;
449 
450 	dp = vsw_map_dring_cmn(ldcp, dring_pkt);
451 	if (dp == NULL) {
452 		return (NULL);
453 	}
454 
455 	/* RxDringData mode specific initializations */
456 	mutex_init(&dp->txlock, NULL, MUTEX_DRIVER, NULL);
457 	mutex_init(&dp->restart_lock, NULL, MUTEX_DRIVER, NULL);
458 	dp->next_txi = dp->restart_peer_txi = 0;
459 	dp->restart_reqd = B_TRUE;
460 	ldcp->dringdata_msgid = 0;
461 	ldcp->lane_in.dringp = dp;
462 
463 	/*
464 	 * Mark the descriptor state as 'done'. This is implementation specific
465 	 * and not required by the protocol. In our implementation, we only
466 	 * need the descripor to be in 'done' state to be used by the transmit
467 	 * function and the peer is not aware of it. As the protocol requires
468 	 * that during initial registration the exporting end point mark the
469 	 * dstate as 'free', we change it 'done' here. After this, the dstate
470 	 * in our implementation will keep moving between 'ready', set by our
471 	 * transmit function; and and 'done', set by the peer (per protocol)
472 	 * after receiving data.
473 	 * Setup on_trap() protection before accessing dring shared memory area.
474 	 */
475 	rv = LDC_ON_TRAP(&otd);
476 	if (rv != 0) {
477 		/*
478 		 * Data access fault occured down the code path below while
479 		 * accessing the descriptors. Return failure.
480 		 */
481 		goto fail;
482 	}
483 
484 	txdp = (vnet_rx_dringdata_desc_t *)dp->pub_addr;
485 	for (i = 0; i < dp->num_descriptors; i++) {
486 		txdp[i].dstate = VIO_DESC_DONE;
487 	}
488 
489 	(void) LDC_NO_TRAP();
490 
491 	return (dp);
492 
493 fail:
494 	if (dp->dring_handle != NULL) {
495 		(void) ldc_mem_dring_unmap(dp->dring_handle);
496 	}
497 	kmem_free(dp, sizeof (*dp));
498 	return (NULL);
499 }
500 
501 /*
502  * Unmap the transmit descriptor ring.
503  */
504 void
505 vsw_unmap_tx_dring(vsw_ldc_t *ldcp)
506 {
507 	lane_t		*lp = &ldcp->lane_in;
508 	dring_info_t	*dp;
509 
510 	if ((dp = lp->dringp) == NULL) {
511 		return;
512 	}
513 
514 	/* Unmap tx data area and free data handle */
515 	if (dp->data_handle != NULL) {
516 		(void) ldc_mem_unmap(dp->data_handle);
517 		(void) ldc_mem_free_handle(dp->data_handle);
518 		dp->data_handle = NULL;
519 	}
520 
521 	/* Free tx data area cookies */
522 	if (dp->data_cookie != NULL) {
523 		kmem_free(dp->data_cookie, dp->data_ncookies *
524 		    sizeof (ldc_mem_cookie_t));
525 		dp->data_cookie = NULL;
526 		dp->data_ncookies = 0;
527 	}
528 
529 	/* Unmap peer's dring */
530 	if (dp->dring_handle != NULL) {
531 		(void) ldc_mem_dring_unmap(dp->dring_handle);
532 		dp->dring_handle = NULL;
533 	}
534 
535 	mutex_destroy(&dp->txlock);
536 	kmem_free(dp, sizeof (dring_info_t));
537 	lp->dringp = NULL;
538 }
539 
540 /*
541  * A per LDC worker thread to process the rx dring and receive packets. This
542  * thread is woken up by the LDC interrupt handler when a dring data info
543  * message is received.
544  */
545 void
546 vsw_ldc_rcv_worker(void *arg)
547 {
548 	callb_cpr_t	cprinfo;
549 	vsw_ldc_t	*ldcp = (vsw_ldc_t *)arg;
550 	vsw_t		*vswp = ldcp->ldc_vswp;
551 
552 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
553 	CALLB_CPR_INIT(&cprinfo, &ldcp->rcv_thr_lock, callb_generic_cpr,
554 	    "vsw_rcv_thread");
555 	mutex_enter(&ldcp->rcv_thr_lock);
556 	while (!(ldcp->rcv_thr_flags & VSW_WTHR_STOP)) {
557 
558 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
559 		/*
560 		 * Wait until the data is received or a stop
561 		 * request is received.
562 		 */
563 		while (!(ldcp->rcv_thr_flags &
564 		    (VSW_WTHR_DATARCVD | VSW_WTHR_STOP))) {
565 			cv_wait(&ldcp->rcv_thr_cv, &ldcp->rcv_thr_lock);
566 		}
567 		CALLB_CPR_SAFE_END(&cprinfo, &ldcp->rcv_thr_lock)
568 
569 		/*
570 		 * First process the stop request.
571 		 */
572 		if (ldcp->rcv_thr_flags & VSW_WTHR_STOP) {
573 			D2(vswp, "%s(%lld):Rx thread stopped\n",
574 			    __func__, ldcp->ldc_id);
575 			break;
576 		}
577 		ldcp->rcv_thr_flags &= ~VSW_WTHR_DATARCVD;
578 		mutex_exit(&ldcp->rcv_thr_lock);
579 		D1(vswp, "%s(%lld):calling vsw_process_pkt\n",
580 		    __func__, ldcp->ldc_id);
581 		vsw_ldc_rcv_shm(ldcp);
582 		mutex_enter(&ldcp->rcv_thr_lock);
583 	}
584 
585 	/*
586 	 * Update the run status and wakeup the thread that
587 	 * has sent the stop request.
588 	 */
589 	ldcp->rcv_thr_flags &= ~VSW_WTHR_STOP;
590 	ldcp->rcv_thread = NULL;
591 	CALLB_CPR_EXIT(&cprinfo);
592 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
593 	thread_exit();
594 }
595 
596 /*
597  * Process the rx descriptor ring in the context of receive worker
598  * thread and switch the received packets to their destinations.
599  */
600 static void
601 vsw_ldc_rcv_shm(vsw_ldc_t *ldcp)
602 {
603 	int		rv;
604 	uint32_t	end_ix;
605 	vio_dring_msg_t msg;
606 	vio_dring_msg_t	*msgp = &msg;
607 	int		count = 0;
608 	int		total_count = 0;
609 	uint32_t	retries = 0;
610 	mblk_t		*bp = NULL;
611 	mblk_t		*bpt = NULL;
612 	mblk_t		*mp = NULL;
613 	vsw_t		*vswp = ldcp->ldc_vswp;
614 	lane_t		*lp = &ldcp->lane_out;
615 	dring_info_t	*dp = lp->dringp;
616 
617 	do {
618 again:
619 		rv = vsw_receive_packet(ldcp, &mp);
620 		if (rv != 0) {
621 			if (rv == EINVAL) {
622 				/* Invalid descriptor error; get next */
623 				continue;
624 			}
625 			if (rv != EAGAIN) {
626 				break;
627 			}
628 
629 			/* Descriptor not ready for processsing */
630 			if (retries == vsw_recv_retries) {
631 				DTRACE_PROBE1(vsw_noready_rxds,
632 				    vsw_ldc_t *, ldcp);
633 				break;
634 			}
635 
636 			/* Switch packets received so far before retrying */
637 			if (bp != NULL) {
638 				VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
639 				    total_count);
640 			}
641 			retries++;
642 			drv_usecwait(vsw_recv_delay);
643 			goto again;
644 		}
645 		retries = 0;
646 
647 		/* Build a chain of received packets */
648 		if (bp == NULL) {
649 			/* first pkt */
650 			bp = mp;
651 			bpt = bp;
652 			bpt->b_next = NULL;
653 		} else {
654 			mp->b_next = NULL;
655 			bpt->b_next = mp;
656 			bpt = mp;
657 		}
658 
659 		total_count++;
660 		count++;
661 
662 		/*
663 		 * If we have gathered vsw_chain_len (tunable)
664 		 * # of packets in the chain, switch them.
665 		 */
666 		if (count == vsw_chain_len) {
667 			VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
668 			    total_count);
669 		}
670 
671 		/*
672 		 * Stop further processing if we processed the entire dring
673 		 * once; otherwise continue.
674 		 */
675 	} while (total_count < dp->num_bufs);
676 
677 	DTRACE_PROBE2(vsw_rx_total_count, vsw_ldc_t *, ldcp,
678 	    int, (total_count));
679 	if (bp != NULL) {
680 		VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
681 		    total_count);
682 	}
683 
684 	/* Send stopped signal to peer (sender) */
685 	end_ix = lp->dringp->next_rxi;
686 	DECR_RXI(dp, end_ix);
687 	msgp->tag.vio_msgtype = VIO_TYPE_DATA;
688 	msgp->tag.vio_subtype = VIO_SUBTYPE_ACK;
689 	msgp->tag.vio_subtype_env = VIO_DRING_DATA;
690 	msgp->dring_ident = ldcp->lane_in.dringp->ident;
691 	msgp->tag.vio_sid = ldcp->local_session;
692 	msgp->dring_process_state = VIO_DP_STOPPED;
693 	msgp->start_idx = VNET_START_IDX_UNSPEC;
694 	msgp->end_idx = end_ix;
695 
696 	(void) vsw_send_msg_shm(ldcp, (void *)msgp,
697 	    sizeof (vio_dring_msg_t), B_TRUE);
698 
699 	ldcp->ldc_stats.dring_data_acks_sent++;
700 	ldcp->ldc_stats.dring_stopped_acks_sent++;
701 }
702 
703 /*
704  * Process the next index in the rx dring and receive the associated packet.
705  *
706  * Returns:
707  *	bp:	Success: The received packet.
708  *		Failure: NULL
709  *      retval:
710  *		Success: 0
711  *		Failure: EAGAIN: Descriptor not ready
712  *			 EIO:    Descriptor contents invalid.
713  */
714 static int
715 vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp)
716 {
717 	uint32_t			rxi;
718 	vio_mblk_t			*vmp;
719 	vio_mblk_t			*new_vmp;
720 	struct ether_header		*ehp;
721 	vnet_rx_dringdata_desc_t	*rxdp;
722 	int				err = 0;
723 	uint_t				nbytes = 0;
724 	mblk_t				*mp = NULL;
725 	mblk_t				*dmp = NULL;
726 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
727 	dring_info_t			*dp = ldcp->lane_out.dringp;
728 	vnet_rx_dringdata_desc_t	*pub_addr = dp->pub_addr;
729 
730 	rxi = dp->next_rxi;
731 	rxdp = &(pub_addr[rxi]);
732 	vmp = dp->rxdp_to_vmp[rxi];
733 
734 	if (rxdp->dstate != VIO_DESC_READY) {
735 		/*
736 		 * Descriptor is not ready.
737 		 */
738 		return (EAGAIN);
739 	}
740 
741 	/*
742 	 * Ensure load ordering of dstate and nbytes.
743 	 */
744 	MEMBAR_CONSUMER();
745 
746 	if ((rxdp->nbytes < ETHERMIN) ||
747 	    (rxdp->nbytes > ldcp->lane_in.mtu) ||
748 	    (rxdp->data_buf_offset !=
749 	    (VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN))) {
750 		/*
751 		 * Descriptor contents invalid.
752 		 */
753 		statsp->ierrors++;
754 		rxdp->dstate = VIO_DESC_DONE;
755 		err = EIO;
756 		goto done;
757 	}
758 
759 	/*
760 	 * Now allocate a new buffer for this descriptor before sending up the
761 	 * buffer being processed. If that fails, stop processing; as we are
762 	 * out of receive buffers.
763 	 */
764 	new_vmp = vio_allocb(dp->rx_vmp);
765 
766 	/*
767 	 * Process the current buffer being received.
768 	 */
769 	nbytes = rxdp->nbytes;
770 	mp = vmp->mp;
771 
772 	if (new_vmp == NULL) {
773 		/*
774 		 * We failed to get a new mapped buffer that is needed to
775 		 * refill the descriptor. In that case, leave the current
776 		 * buffer bound to the descriptor; allocate an mblk dynamically
777 		 * and copy the contents of the buffer to the mblk. Then send
778 		 * up this mblk. This way the sender has the same buffer as
779 		 * before that can be used to send new data.
780 		 */
781 		statsp->norcvbuf++;
782 		dmp = allocb(nbytes + VNET_IPALIGN, BPRI_MED);
783 		bcopy(mp->b_rptr + VNET_IPALIGN,
784 		    dmp->b_rptr + VNET_IPALIGN, nbytes);
785 		mp = dmp;
786 	} else {
787 		/* Mark the status of the current rbuf */
788 		vmp->state = VIO_MBLK_HAS_DATA;
789 
790 		/* Set the offset of the new buffer in the descriptor */
791 		rxdp->data_buf_offset =
792 		    VIO_MBLK_DATA_OFF(new_vmp) + VNET_IPALIGN;
793 		dp->rxdp_to_vmp[rxi] = new_vmp;
794 	}
795 	mp->b_rptr += VNET_IPALIGN;
796 	mp->b_wptr = mp->b_rptr + nbytes;
797 
798 	/*
799 	 * Ensure store ordering of data_buf_offset and dstate; so that the
800 	 * peer sees the right data_buf_offset after it checks that the dstate
801 	 * is DONE.
802 	 */
803 	MEMBAR_PRODUCER();
804 
805 	/* Now mark the descriptor 'done' */
806 	rxdp->dstate = VIO_DESC_DONE;
807 
808 	/* Update stats */
809 	statsp->ipackets++;
810 	statsp->rbytes += rxdp->nbytes;
811 	ehp = (struct ether_header *)mp->b_rptr;
812 	if (IS_BROADCAST(ehp))
813 		statsp->brdcstrcv++;
814 	else if (IS_MULTICAST(ehp))
815 		statsp->multircv++;
816 done:
817 	/* Update the next index to be processed */
818 	INCR_RXI(dp, rxi);
819 
820 	/* Save the new recv index */
821 	dp->next_rxi = rxi;
822 
823 	/* Return the packet received */
824 	*bp = mp;
825 	return (err);
826 }
827 
828 void
829 vsw_stop_rcv_thread(vsw_ldc_t *ldcp)
830 {
831 	kt_did_t	tid = 0;
832 	vsw_t		*vswp = ldcp->ldc_vswp;
833 
834 	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
835 	/*
836 	 * Send a stop request by setting the stop flag and
837 	 * wait until the rcv process thread stops.
838 	 */
839 	mutex_enter(&ldcp->rcv_thr_lock);
840 	if (ldcp->rcv_thread != NULL) {
841 		tid = ldcp->rcv_thread->t_did;
842 		ldcp->rcv_thr_flags |= VSW_WTHR_STOP;
843 		cv_signal(&ldcp->rcv_thr_cv);
844 	}
845 	mutex_exit(&ldcp->rcv_thr_lock);
846 
847 	if (tid != 0) {
848 		thread_join(tid);
849 	}
850 	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
851 }
852 
853 int
854 vsw_dringsend_shm(vsw_ldc_t *ldcp, mblk_t *mp)
855 {
856 	uint32_t			next_txi;
857 	uint32_t			txi;
858 	vnet_rx_dringdata_desc_t	*txdp;
859 	struct ether_header		*ehp;
860 	size_t				mblksz;
861 	caddr_t				dst;
862 	mblk_t				*bp;
863 	size_t				size;
864 	on_trap_data_t			otd;
865 	uint32_t			buf_offset;
866 	vnet_rx_dringdata_desc_t	*pub_addr;
867 	vio_dring_msg_t			msg;
868 	vio_dring_msg_t			*msgp = &msg;
869 	int				rv = 0;
870 	boolean_t			resched_peer = B_FALSE;
871 	boolean_t			is_bcast = B_FALSE;
872 	boolean_t			is_mcast = B_FALSE;
873 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
874 	lane_t				*lane_in = &ldcp->lane_in;
875 	lane_t				*lane_out = &ldcp->lane_out;
876 	dring_info_t			*dp = lane_in->dringp;
877 	vsw_t				*vswp = ldcp->ldc_vswp;
878 
879 	if ((!(lane_in->lstate & VSW_LANE_ACTIVE)) ||
880 	    (ldcp->ldc_status != LDC_UP) || (ldcp->ldc_handle == NULL)) {
881 		DWARN(vswp, "%s(%lld) status(%d) lstate(0x%llx), dropping "
882 		    "packet\n", __func__, ldcp->ldc_id, ldcp->ldc_status,
883 		    lane_in->lstate);
884 		statsp->oerrors++;
885 		return (LDC_TX_FAILURE);
886 	}
887 
888 	if (dp == NULL) {
889 		DERR(vswp, "%s(%lld): no dring for outbound lane on"
890 		    " channel %d", __func__, ldcp->ldc_id, ldcp->ldc_id);
891 		statsp->oerrors++;
892 		return (LDC_TX_FAILURE);
893 	}
894 	pub_addr = dp->pub_addr;
895 
896 	size = msgsize(mp);
897 
898 	/*
899 	 * Note: In RxDringData mode, lane_in is associated with transmit and
900 	 * lane_out is associated with receive. However, we still keep the
901 	 * negotiated mtu in lane_out (our exported attributes).
902 	 */
903 	if (size > (size_t)lane_out->mtu) {
904 		DERR(vswp, "%s(%lld) invalid size (%ld)\n", __func__,
905 		    ldcp->ldc_id, size);
906 		statsp->oerrors++;
907 		return (LDC_TX_FAILURE);
908 	}
909 
910 	if (size < ETHERMIN)
911 		size = ETHERMIN;
912 
913 	ehp = (struct ether_header *)mp->b_rptr;
914 	is_bcast = IS_BROADCAST(ehp);
915 	is_mcast = IS_MULTICAST(ehp);
916 
917 	/*
918 	 * Setup on_trap() protection before accessing shared memory areas
919 	 * (descriptor and data buffer). Note that we enable this protection a
920 	 * little early and turn it off slightly later, than keeping it enabled
921 	 * strictly at the points in code below where the descriptor and data
922 	 * buffer are accessed. This is done for performance reasons:
923 	 * (a) to avoid calling the trap protection code while holding mutex.
924 	 * (b) to avoid multiple on/off steps for descriptor and data accesses.
925 	 */
926 	rv = LDC_ON_TRAP(&otd);
927 	if (rv != 0) {
928 		/*
929 		 * Data access fault occured down the code path below while
930 		 * accessing either the descriptor or the data buffer. Release
931 		 * any locks that we might have acquired in the code below and
932 		 * return failure.
933 		 */
934 		DERR(vswp, "%s(%lld) data access fault occured\n",
935 		    __func__, ldcp->ldc_id);
936 		statsp->oerrors++;
937 		if (mutex_owned(&dp->txlock)) {
938 			mutex_exit(&dp->txlock);
939 		}
940 		if (mutex_owned(&dp->restart_lock)) {
941 			mutex_exit(&dp->restart_lock);
942 		}
943 		goto dringsend_shm_exit;
944 	}
945 
946 	/*
947 	 * Allocate a descriptor
948 	 */
949 	mutex_enter(&dp->txlock);
950 	txi = next_txi = dp->next_txi;
951 	INCR_TXI(dp, next_txi);
952 	txdp = &(pub_addr[txi]);
953 	if (txdp->dstate != VIO_DESC_DONE) { /* out of descriptors */
954 		statsp->tx_no_desc++;
955 		mutex_exit(&dp->txlock);
956 		(void) LDC_NO_TRAP();
957 		return (LDC_TX_NORESOURCES);
958 	} else {
959 		txdp->dstate = VIO_DESC_INITIALIZING;
960 	}
961 
962 	/* Update descriptor ring index */
963 	dp->next_txi = next_txi;
964 	mutex_exit(&dp->txlock);
965 
966 	/* Ensure load ordering of dstate (above) and data_buf_offset. */
967 	MEMBAR_CONSUMER();
968 
969 	/* Get the offset of the buffer to be used */
970 	buf_offset = txdp->data_buf_offset;
971 
972 	/* Access the buffer using the offset */
973 	dst = (caddr_t)dp->data_addr + buf_offset;
974 
975 	/* Copy data into mapped transmit buffer */
976 	for (bp = mp; bp != NULL; bp = bp->b_cont) {
977 		mblksz = MBLKL(bp);
978 		bcopy(bp->b_rptr, dst, mblksz);
979 		dst += mblksz;
980 	}
981 
982 	/* Set the size of data in the descriptor */
983 	txdp->nbytes = size;
984 
985 	/*
986 	 * Ensure store ordering of nbytes and dstate (below); so that the peer
987 	 * sees the right nbytes value after it checks that the dstate is READY.
988 	 */
989 	MEMBAR_PRODUCER();
990 
991 	mutex_enter(&dp->restart_lock);
992 
993 	ASSERT(txdp->dstate == VIO_DESC_INITIALIZING);
994 
995 	/* Mark the descriptor ready */
996 	txdp->dstate = VIO_DESC_READY;
997 
998 	/* Check if peer needs wake up (handled below) */
999 	if (dp->restart_reqd == B_TRUE && dp->restart_peer_txi == txi) {
1000 		dp->restart_reqd = B_FALSE;
1001 		resched_peer = B_TRUE;
1002 	}
1003 
1004 	/* Update tx stats */
1005 	statsp->opackets++;
1006 	statsp->obytes += size;
1007 	if (is_bcast)
1008 		statsp->brdcstxmt++;
1009 	else if (is_mcast)
1010 		statsp->multixmt++;
1011 
1012 	mutex_exit(&dp->restart_lock);
1013 
1014 	/*
1015 	 * We are done accessing shared memory; clear trap protection.
1016 	 */
1017 	(void) LDC_NO_TRAP();
1018 
1019 	/*
1020 	 * Need to wake up the peer ?
1021 	 */
1022 	if (resched_peer == B_TRUE) {
1023 		msgp->tag.vio_msgtype = VIO_TYPE_DATA;
1024 		msgp->tag.vio_subtype = VIO_SUBTYPE_INFO;
1025 		msgp->tag.vio_subtype_env = VIO_DRING_DATA;
1026 		msgp->tag.vio_sid = ldcp->local_session;
1027 		msgp->dring_ident = lane_out->dringp->ident;
1028 		msgp->start_idx = txi;
1029 		msgp->end_idx = -1;
1030 
1031 		rv = vsw_send_msg_shm(ldcp, (void *)msgp, sizeof (*msgp),
1032 		    B_FALSE);
1033 		if (rv != 0) {
1034 			/* error: drop the packet */
1035 			DERR(vswp, "%s(%lld) failed sending dringdata msg\n",
1036 			    __func__, ldcp->ldc_id);
1037 			mutex_enter(&dp->restart_lock);
1038 			statsp->oerrors++;
1039 			dp->restart_reqd = B_TRUE;
1040 			mutex_exit(&dp->restart_lock);
1041 		}
1042 		statsp->dring_data_msgs_sent++;
1043 	}
1044 
1045 dringsend_shm_exit:
1046 	if (rv == ECONNRESET || rv == EACCES) {
1047 		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1048 	}
1049 	return (LDC_TX_SUCCESS);
1050 }
1051 
1052 void
1053 vsw_process_dringdata_shm(void *arg, void *dpkt)
1054 {
1055 	vsw_ldc_t		*ldcp = arg;
1056 	vsw_t			*vswp = ldcp->ldc_vswp;
1057 	vio_dring_msg_t		*dring_pkt = dpkt;
1058 
1059 	switch (dring_pkt->tag.vio_subtype) {
1060 	case VIO_SUBTYPE_INFO:
1061 		D2(vswp, "%s(%lld): VIO_SUBTYPE_INFO", __func__, ldcp->ldc_id);
1062 		vsw_process_dringdata_info_shm(ldcp, dring_pkt);
1063 		break;
1064 
1065 	case VIO_SUBTYPE_ACK:
1066 		D2(vswp, "%s(%lld): VIO_SUBTYPE_ACK", __func__, ldcp->ldc_id);
1067 		vsw_process_dringdata_ack_shm(ldcp, dring_pkt);
1068 		break;
1069 
1070 	case VIO_SUBTYPE_NACK:
1071 		DWARN(vswp, "%s(%lld): VIO_SUBTYPE_NACK",
1072 		    __func__, ldcp->ldc_id);
1073 		/*
1074 		 * Something is badly wrong if we are getting NACK's
1075 		 * for our data pkts. So reset the channel.
1076 		 */
1077 		vsw_process_conn_evt(ldcp, VSW_CONN_RESTART);
1078 		break;
1079 
1080 	default:
1081 		DERR(vswp, "%s(%lld): Unknown vio_subtype %x\n", __func__,
1082 		    ldcp->ldc_id, dring_pkt->tag.vio_subtype);
1083 	}
1084 }
1085 
1086 static void
1087 vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1088 {
1089 	dring_info_t	*dp = ldcp->lane_in.dringp;
1090 	vsw_t		*vswp = ldcp->ldc_vswp;
1091 	vgen_stats_t	*statsp = &ldcp->ldc_stats;
1092 
1093 	if (dp->ident != msg->dring_ident) {
1094 		/* drop the message */
1095 		DERR(vswp, "%s(%lld): Invalid dring ident 0x%llx",
1096 		    __func__, ldcp->ldc_id, msg->dring_ident);
1097 		return;
1098 	}
1099 
1100 	statsp->dring_data_msgs_rcvd++;
1101 
1102 	/*
1103 	 * Wake up the rcv worker thread to process the rx dring.
1104 	 */
1105 	ASSERT(MUTEX_HELD(&ldcp->ldc_cblock));
1106 	mutex_exit(&ldcp->ldc_cblock);
1107 	mutex_enter(&ldcp->rcv_thr_lock);
1108 	if (!(ldcp->rcv_thr_flags & VSW_WTHR_DATARCVD)) {
1109 		ldcp->rcv_thr_flags |= VSW_WTHR_DATARCVD;
1110 		cv_signal(&ldcp->rcv_thr_cv);
1111 	}
1112 	mutex_exit(&ldcp->rcv_thr_lock);
1113 	mutex_enter(&ldcp->ldc_cblock);
1114 }
1115 
1116 static void
1117 vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1118 {
1119 	dring_info_t			*dp;
1120 	uint32_t			start;
1121 	int32_t				end;
1122 	int				rv;
1123 	on_trap_data_t			otd;
1124 	uint32_t			txi;
1125 	vnet_rx_dringdata_desc_t	*txdp;
1126 	vnet_rx_dringdata_desc_t	*pub_addr;
1127 	boolean_t			ready_txd = B_FALSE;
1128 	vsw_t				*vswp = ldcp->ldc_vswp;
1129 	vgen_stats_t			*statsp = &ldcp->ldc_stats;
1130 
1131 	dp = ldcp->lane_in.dringp;
1132 	start = msg->start_idx;
1133 	end = msg->end_idx;
1134 	pub_addr = dp->pub_addr;
1135 
1136 	/*
1137 	 * In RxDringData mode (v1.6), start index of -1 can be used by the
1138 	 * peer to indicate that it is unspecified. However, the end index
1139 	 * must be set correctly indicating the last descriptor index processed.
1140 	 */
1141 	if (((start != VNET_START_IDX_UNSPEC) && !(CHECK_TXI(dp, start))) ||
1142 	    !(CHECK_TXI(dp, end))) {
1143 		/* drop the message if invalid index */
1144 		DWARN(vswp, "%s(%lld): Invalid Tx ack start(%d) or end(%d)\n",
1145 		    __func__, ldcp->ldc_id, start, end);
1146 		return;
1147 	}
1148 
1149 	/* Validate dring_ident */
1150 	if (msg->dring_ident != ldcp->lane_out.dringp->ident) {
1151 		/* invalid dring_ident, drop the msg */
1152 		DWARN(vswp, "%s(%lld): Invalid dring ident 0x%x\n",
1153 		    __func__, ldcp->ldc_id, msg->dring_ident);
1154 		return;
1155 	}
1156 	statsp->dring_data_acks_rcvd++;
1157 
1158 	if (msg->dring_process_state != VIO_DP_STOPPED) {
1159 		/*
1160 		 * Receiver continued processing
1161 		 * dring after sending us the ack.
1162 		 */
1163 		return;
1164 	}
1165 
1166 	statsp->dring_stopped_acks_rcvd++;
1167 
1168 	/*
1169 	 * Setup on_trap() protection before accessing dring shared memory area.
1170 	 */
1171 	rv = LDC_ON_TRAP(&otd);
1172 	if (rv != 0) {
1173 		/*
1174 		 * Data access fault occured down the code path below while
1175 		 * accessing the descriptors. Release any locks that we might
1176 		 * have acquired in the code below and return failure.
1177 		 */
1178 		if (mutex_owned(&dp->restart_lock)) {
1179 			mutex_exit(&dp->restart_lock);
1180 		}
1181 		return;
1182 	}
1183 
1184 	/*
1185 	 * Determine if there are any pending tx descriptors ready to be
1186 	 * processed by the receiver(peer) and if so, send a message to the
1187 	 * peer to restart receiving.
1188 	 */
1189 	mutex_enter(&dp->restart_lock);
1190 
1191 	ready_txd = B_FALSE;
1192 	txi = end;
1193 	INCR_TXI(dp, txi);
1194 	txdp = &pub_addr[txi];
1195 	if (txdp->dstate == VIO_DESC_READY) {
1196 		ready_txd = B_TRUE;
1197 	}
1198 
1199 	/*
1200 	 * We are done accessing shared memory; clear trap protection.
1201 	 */
1202 	(void) LDC_NO_TRAP();
1203 
1204 	if (ready_txd == B_FALSE) {
1205 		/*
1206 		 * No ready tx descriptors. Set the flag to send a message to
1207 		 * the peer when tx descriptors are ready in transmit routine.
1208 		 */
1209 		dp->restart_reqd = B_TRUE;
1210 		dp->restart_peer_txi = txi;
1211 		mutex_exit(&dp->restart_lock);
1212 		return;
1213 	}
1214 
1215 	/*
1216 	 * We have some tx descriptors ready to be processed by the receiver.
1217 	 * Send a dring data message to the peer to restart processing.
1218 	 */
1219 	dp->restart_reqd = B_FALSE;
1220 	mutex_exit(&dp->restart_lock);
1221 
1222 	msg->tag.vio_msgtype = VIO_TYPE_DATA;
1223 	msg->tag.vio_subtype = VIO_SUBTYPE_INFO;
1224 	msg->tag.vio_subtype_env = VIO_DRING_DATA;
1225 	msg->tag.vio_sid = ldcp->local_session;
1226 	msg->dring_ident = ldcp->lane_out.dringp->ident;
1227 	msg->start_idx = txi;
1228 	msg->end_idx = -1;
1229 	rv = vsw_send_msg_shm(ldcp, (void *)msg,
1230 	    sizeof (vio_dring_msg_t), B_FALSE);
1231 	statsp->dring_data_msgs_sent++;
1232 	if (rv != 0) {
1233 		mutex_enter(&dp->restart_lock);
1234 		dp->restart_reqd = B_TRUE;
1235 		mutex_exit(&dp->restart_lock);
1236 	}
1237 
1238 	if (rv == ECONNRESET) {
1239 		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1240 	}
1241 }
1242 
1243 /*
1244  * Send dring data msgs (info/ack/nack) over LDC.
1245  */
1246 int
1247 vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size, boolean_t handle_reset)
1248 {
1249 	int			rv;
1250 	int			retries = vsw_wretries;
1251 	size_t			msglen = size;
1252 	vsw_t			*vswp = ldcp->ldc_vswp;
1253 	vio_dring_msg_t		*dmsg = (vio_dring_msg_t *)msgp;
1254 
1255 	D1(vswp, "vsw_send_msg (%lld) enter : sending %d bytes",
1256 	    ldcp->ldc_id, size);
1257 
1258 	dmsg->seq_num = atomic_inc_32_nv(&ldcp->dringdata_msgid);
1259 
1260 	do {
1261 		msglen = size;
1262 		rv = ldc_write(ldcp->ldc_handle, (caddr_t)msgp, &msglen);
1263 	} while (rv == EWOULDBLOCK && --retries > 0);
1264 
1265 	if ((rv != 0) || (msglen != size)) {
1266 		DERR(vswp, "vsw_send_msg_shm:ldc_write failed: "
1267 		    "chan(%lld) rv(%d) size (%d) msglen(%d)\n",
1268 		    ldcp->ldc_id, rv, size, msglen);
1269 		ldcp->ldc_stats.oerrors++;
1270 	}
1271 
1272 	/*
1273 	 * If channel has been reset we either handle it here or
1274 	 * simply report back that it has been reset and let caller
1275 	 * decide what to do.
1276 	 */
1277 	if (rv == ECONNRESET) {
1278 		DWARN(vswp, "%s (%lld) channel reset", __func__, ldcp->ldc_id);
1279 
1280 		if (handle_reset) {
1281 			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1282 		}
1283 	}
1284 
1285 	return (rv);
1286 }
1287