xref: /titanic_50/usr/src/uts/sun4v/io/vnet_txdring.c (revision 356f72340a69936724c69f2f87fffa6f5887f885)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 #include <sys/types.h>
27 #include <sys/errno.h>
28 #include <sys/sysmacros.h>
29 #include <sys/param.h>
30 #include <sys/machsystm.h>
31 #include <sys/stream.h>
32 #include <sys/strsubr.h>
33 #include <sys/kmem.h>
34 #include <sys/strsun.h>
35 #include <sys/callb.h>
36 #include <sys/sdt.h>
37 #include <sys/ethernet.h>
38 #include <sys/mach_descrip.h>
39 #include <sys/mdeg.h>
40 #include <sys/vnet.h>
41 #include <sys/vio_mailbox.h>
42 #include <sys/vio_common.h>
43 #include <sys/vnet_common.h>
44 #include <sys/vnet_mailbox.h>
45 #include <sys/vio_util.h>
46 #include <sys/vnet_gen.h>
47 
48 /*
49  * This file contains the implementation of TxDring data transfer mode of VIO
50  * Protocol in vnet. The functions in this file are invoked from vnet_gen.c
51  * after TxDring mode is negotiated with the peer during attribute phase of
52  * handshake. This file contains functions that setup the transmit and receive
53  * descriptor rings, and associated resources in TxDring mode. It also contains
54  * the transmit and receive data processing functions that are invoked in
55  * TxDring mode.
56  */
57 
58 /* Functions exported to vnet_gen.c */
59 int vgen_create_tx_dring(vgen_ldc_t *ldcp);
60 void vgen_destroy_tx_dring(vgen_ldc_t *ldcp);
61 int vgen_map_rx_dring(vgen_ldc_t *ldcp, void *pkt);
62 void vgen_unmap_rx_dring(vgen_ldc_t *ldcp);
63 int vgen_dringsend(void *arg, mblk_t *mp);
64 void vgen_ldc_msg_worker(void *arg);
65 void vgen_stop_msg_thread(vgen_ldc_t *ldcp);
66 int vgen_handle_dringdata(void *arg1, void *arg2);
67 mblk_t *vgen_poll_rcv(vgen_ldc_t *ldcp, int bytes_to_pickup);
68 int vgen_check_datamsg_seq(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp);
69 int vgen_sendmsg(vgen_ldc_t *ldcp, caddr_t msg,  size_t msglen,
70     boolean_t caller_holds_lock);
71 
72 /* Internal functions */
73 static int vgen_init_multipools(vgen_ldc_t *ldcp);
74 static int vgen_handle_dringdata_info(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp);
75 static int vgen_process_dringdata(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp);
76 static int vgen_handle_dringdata_ack(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp);
77 static int vgen_handle_dringdata_nack(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp);
78 static void vgen_rx(vgen_ldc_t *ldcp, mblk_t *bp, mblk_t *bpt);
79 static int vgen_send_dringdata(vgen_ldc_t *ldcp, uint32_t start, int32_t end);
80 static int vgen_send_dringack(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp,
81     uint32_t start, int32_t end, uint8_t pstate);
82 static void vgen_reclaim(vgen_ldc_t *ldcp);
83 static void vgen_reclaim_dring(vgen_ldc_t *ldcp);
84 
85 /* Functions imported from vnet_gen.c */
86 extern int vgen_handle_evt_read(vgen_ldc_t *ldcp, vgen_caller_t caller);
87 extern int vgen_handle_evt_reset(vgen_ldc_t *ldcp, vgen_caller_t caller);
88 extern void vgen_handle_pkt_data(void *arg1, void *arg2, uint32_t msglen);
89 extern void vgen_destroy_rxpools(void *arg);
90 
91 /* Tunables */
92 extern int vgen_rxpool_cleanup_delay;
93 extern boolean_t vnet_jumbo_rxpools;
94 extern uint32_t vnet_num_descriptors;
95 extern uint32_t vgen_chain_len;
96 extern uint32_t vgen_ldcwr_retries;
97 extern uint32_t vgen_recv_delay;
98 extern uint32_t vgen_recv_retries;
99 extern uint32_t vgen_rbufsz1;
100 extern uint32_t vgen_rbufsz2;
101 extern uint32_t vgen_rbufsz3;
102 extern uint32_t vgen_rbufsz4;
103 extern uint32_t vgen_nrbufs1;
104 extern uint32_t vgen_nrbufs2;
105 extern uint32_t vgen_nrbufs3;
106 extern uint32_t vgen_nrbufs4;
107 
108 #ifdef DEBUG
109 
110 #define	DEBUG_PRINTF	vgen_debug_printf
111 
112 extern int vnet_dbglevel;
113 extern int vgen_inject_err_flag;
114 
115 extern void vgen_debug_printf(const char *fname, vgen_t *vgenp,
116 	vgen_ldc_t *ldcp, const char *fmt, ...);
117 extern boolean_t vgen_inject_error(vgen_ldc_t *ldcp, int error);
118 
119 #endif
120 
121 /*
122  * Allocate transmit resources for the channel. The resources consist of a
123  * transmit descriptor ring and an associated transmit buffer area.
124  */
125 int
126 vgen_create_tx_dring(vgen_ldc_t *ldcp)
127 {
128 	int 			i;
129 	int 			rv;
130 	ldc_mem_info_t		minfo;
131 	uint32_t		txdsize;
132 	uint32_t		tbufsize;
133 	vgen_private_desc_t	*tbufp;
134 	vnet_public_desc_t	*txdp;
135 	vio_dring_entry_hdr_t	*hdrp;
136 	caddr_t			datap = NULL;
137 	int			ci;
138 	uint32_t		ncookies;
139 	size_t			data_sz;
140 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
141 
142 	ldcp->num_txds = vnet_num_descriptors;
143 	txdsize = sizeof (vnet_public_desc_t);
144 	tbufsize = sizeof (vgen_private_desc_t);
145 
146 	/* allocate transmit buffer ring */
147 	tbufp = kmem_zalloc(ldcp->num_txds * tbufsize, KM_NOSLEEP);
148 	if (tbufp == NULL) {
149 		return (DDI_FAILURE);
150 	}
151 	ldcp->tbufp = tbufp;
152 	ldcp->tbufendp = &((ldcp->tbufp)[ldcp->num_txds]);
153 
154 	/* create transmit descriptor ring */
155 	rv = ldc_mem_dring_create(ldcp->num_txds, txdsize,
156 	    &ldcp->tx_dring_handle);
157 	if (rv != 0) {
158 		DWARN(vgenp, ldcp, "ldc_mem_dring_create() failed\n");
159 		goto fail;
160 	}
161 
162 	/* get the addr of descriptor ring */
163 	rv = ldc_mem_dring_info(ldcp->tx_dring_handle, &minfo);
164 	if (rv != 0) {
165 		DWARN(vgenp, ldcp, "ldc_mem_dring_info() failed\n");
166 		goto fail;
167 	}
168 	ldcp->txdp = (vnet_public_desc_t *)(minfo.vaddr);
169 
170 	/*
171 	 * In order to ensure that the number of ldc cookies per descriptor is
172 	 * limited to be within the default MAX_COOKIES (2), we take the steps
173 	 * outlined below:
174 	 *
175 	 * Align the entire data buffer area to 8K and carve out per descriptor
176 	 * data buffers starting from this 8K aligned base address.
177 	 *
178 	 * We round up the mtu specified to be a multiple of 2K or 4K.
179 	 * For sizes up to 12K we round up the size to the next 2K.
180 	 * For sizes > 12K we round up to the next 4K (otherwise sizes such as
181 	 * 14K could end up needing 3 cookies, with the buffer spread across
182 	 * 3 8K pages:  8K+6K, 2K+8K+2K, 6K+8K, ...).
183 	 */
184 	data_sz = vgenp->max_frame_size + VNET_IPALIGN + VNET_LDCALIGN;
185 	if (data_sz <= VNET_12K) {
186 		data_sz = VNET_ROUNDUP_2K(data_sz);
187 	} else {
188 		data_sz = VNET_ROUNDUP_4K(data_sz);
189 	}
190 
191 	/* allocate extra 8K bytes for alignment */
192 	ldcp->tx_data_sz = (data_sz * ldcp->num_txds) + VNET_8K;
193 	datap = kmem_zalloc(ldcp->tx_data_sz, KM_SLEEP);
194 	ldcp->tx_datap = datap;
195 
196 
197 	/* align the starting address of the data area to 8K */
198 	datap = (caddr_t)VNET_ROUNDUP_8K((uintptr_t)datap);
199 
200 	/*
201 	 * for each private descriptor, allocate a ldc mem_handle which is
202 	 * required to map the data during transmit, set the flags
203 	 * to free (available for use by transmit routine).
204 	 */
205 
206 	for (i = 0; i < ldcp->num_txds; i++) {
207 
208 		tbufp = &(ldcp->tbufp[i]);
209 		rv = ldc_mem_alloc_handle(ldcp->ldc_handle,
210 		    &(tbufp->memhandle));
211 		if (rv) {
212 			tbufp->memhandle = 0;
213 			goto fail;
214 		}
215 
216 		/*
217 		 * bind ldc memhandle to the corresponding transmit buffer.
218 		 */
219 		ci = ncookies = 0;
220 		rv = ldc_mem_bind_handle(tbufp->memhandle,
221 		    (caddr_t)datap, data_sz, LDC_SHADOW_MAP,
222 		    LDC_MEM_R, &(tbufp->memcookie[ci]), &ncookies);
223 		if (rv != 0) {
224 			goto fail;
225 		}
226 
227 		/*
228 		 * successful in binding the handle to tx data buffer.
229 		 * set datap in the private descr to this buffer.
230 		 */
231 		tbufp->datap = datap;
232 
233 		if ((ncookies == 0) ||
234 		    (ncookies > MAX_COOKIES)) {
235 			goto fail;
236 		}
237 
238 		for (ci = 1; ci < ncookies; ci++) {
239 			rv = ldc_mem_nextcookie(tbufp->memhandle,
240 			    &(tbufp->memcookie[ci]));
241 			if (rv != 0) {
242 				goto fail;
243 			}
244 		}
245 
246 		tbufp->ncookies = ncookies;
247 		datap += data_sz;
248 
249 		tbufp->flags = VGEN_PRIV_DESC_FREE;
250 		txdp = &(ldcp->txdp[i]);
251 		hdrp = &txdp->hdr;
252 		hdrp->dstate = VIO_DESC_FREE;
253 		hdrp->ack = B_FALSE;
254 		tbufp->descp = txdp;
255 
256 	}
257 
258 	/*
259 	 * The descriptors and the associated buffers are all ready;
260 	 * now bind descriptor ring to the channel.
261 	 */
262 	rv = ldc_mem_dring_bind(ldcp->ldc_handle, ldcp->tx_dring_handle,
263 	    LDC_DIRECT_MAP | LDC_SHADOW_MAP, LDC_MEM_RW,
264 	    &ldcp->tx_dring_cookie, &ncookies);
265 	if (rv != 0) {
266 		DWARN(vgenp, ldcp, "ldc_mem_dring_bind failed "
267 		    "rv(%x)\n", rv);
268 		goto fail;
269 	}
270 	ASSERT(ncookies == 1);
271 	ldcp->tx_dring_ncookies = ncookies;
272 
273 	/* reset tbuf walking pointers */
274 	ldcp->next_tbufp = ldcp->tbufp;
275 	ldcp->cur_tbufp = ldcp->tbufp;
276 
277 	/* initialize tx seqnum and index */
278 	ldcp->next_txseq = VNET_ISS;
279 	ldcp->next_txi = 0;
280 
281 	ldcp->resched_peer = B_TRUE;
282 	ldcp->resched_peer_txi = 0;
283 
284 	return (VGEN_SUCCESS);
285 
286 fail:
287 	vgen_destroy_tx_dring(ldcp);
288 	return (VGEN_FAILURE);
289 }
290 
291 /*
292  * Free transmit resources for the channel.
293  */
294 void
295 vgen_destroy_tx_dring(vgen_ldc_t *ldcp)
296 {
297 	int 			i;
298 	int			tbufsize = sizeof (vgen_private_desc_t);
299 	vgen_private_desc_t	*tbufp = ldcp->tbufp;
300 
301 	/* We first unbind the descriptor ring */
302 	if (ldcp->tx_dring_ncookies != 0) {
303 		(void) ldc_mem_dring_unbind(ldcp->tx_dring_handle);
304 		ldcp->tx_dring_ncookies = 0;
305 	}
306 
307 	/* Unbind transmit buffers */
308 	if (ldcp->tbufp != NULL) {
309 		/* for each tbuf (priv_desc), free ldc mem_handle */
310 		for (i = 0; i < ldcp->num_txds; i++) {
311 
312 			tbufp = &(ldcp->tbufp[i]);
313 
314 			if (tbufp->datap) { /* if bound to a ldc memhandle */
315 				(void) ldc_mem_unbind_handle(tbufp->memhandle);
316 				tbufp->datap = NULL;
317 			}
318 			if (tbufp->memhandle) {
319 				(void) ldc_mem_free_handle(tbufp->memhandle);
320 				tbufp->memhandle = 0;
321 			}
322 		}
323 	}
324 
325 	/* Free tx data buffer area */
326 	if (ldcp->tx_datap != NULL) {
327 		kmem_free(ldcp->tx_datap, ldcp->tx_data_sz);
328 		ldcp->tx_datap = NULL;
329 		ldcp->tx_data_sz = 0;
330 	}
331 
332 	/* Free transmit descriptor ring */
333 	if (ldcp->tx_dring_handle != 0) {
334 		(void) ldc_mem_dring_destroy(ldcp->tx_dring_handle);
335 		ldcp->tx_dring_handle = 0;
336 		ldcp->txdp = NULL;
337 	}
338 
339 	/* Free transmit buffer ring */
340 	if (ldcp->tbufp != NULL) {
341 		kmem_free(ldcp->tbufp, ldcp->num_txds * tbufsize);
342 		ldcp->tbufp = ldcp->tbufendp = NULL;
343 	}
344 }
345 
346 /*
347  * Map the transmit descriptor ring exported
348  * by the peer, as our receive descriptor ring.
349  */
350 int
351 vgen_map_rx_dring(vgen_ldc_t *ldcp, void *pkt)
352 {
353 	int			rv;
354 	ldc_mem_info_t		minfo;
355 	ldc_mem_cookie_t	dcookie;
356 	uint32_t		ncookies;
357 	uint32_t 		num_desc;
358 	uint32_t		desc_size;
359 	vio_dring_reg_msg_t	*msg = pkt;
360 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
361 
362 	ncookies = msg->ncookies;
363 	num_desc = msg->num_descriptors;
364 	desc_size = msg->descriptor_size;
365 	bcopy(&msg->cookie[0], &dcookie, sizeof (ldc_mem_cookie_t));
366 
367 	/*
368 	 * Sanity check.
369 	 */
370 	if (num_desc < VGEN_NUM_DESCRIPTORS_MIN ||
371 	    desc_size < sizeof (vnet_public_desc_t)) {
372 		goto fail;
373 	}
374 
375 	/* Map the remote dring */
376 	rv = ldc_mem_dring_map(ldcp->ldc_handle, &dcookie, ncookies, num_desc,
377 	    desc_size, LDC_DIRECT_MAP, &(ldcp->rx_dring_handle));
378 	if (rv != 0) {
379 		goto fail;
380 	}
381 
382 	/*
383 	 * Sucessfully mapped, now try to get info about the mapped dring
384 	 */
385 	rv = ldc_mem_dring_info(ldcp->rx_dring_handle, &minfo);
386 	if (rv != 0) {
387 		goto fail;
388 	}
389 
390 	/*
391 	 * Save ring address, number of descriptors.
392 	 */
393 	ldcp->mrxdp = (vnet_public_desc_t *)(minfo.vaddr);
394 	bcopy(&dcookie, &(ldcp->rx_dring_cookie), sizeof (dcookie));
395 	ldcp->rx_dring_ncookies = ncookies;
396 	ldcp->num_rxds = num_desc;
397 
398 	/* Initialize rx dring indexes and seqnum */
399 	ldcp->next_rxi = 0;
400 	ldcp->next_rxseq = VNET_ISS;
401 	ldcp->dring_mtype = minfo.mtype;
402 
403 	/* Save peer's dring_info values */
404 	bcopy(&dcookie, &(ldcp->peer_hparams.dring_cookie),
405 	    sizeof (ldc_mem_cookie_t));
406 	ldcp->peer_hparams.num_desc = num_desc;
407 	ldcp->peer_hparams.desc_size = desc_size;
408 	ldcp->peer_hparams.dring_ncookies = ncookies;
409 
410 	/* Set dring_ident for the peer */
411 	ldcp->peer_hparams.dring_ident = (uint64_t)ldcp->txdp;
412 
413 	/* Return the dring_ident in ack msg */
414 	msg->dring_ident = (uint64_t)ldcp->txdp;
415 
416 	/* alloc rx mblk pools */
417 	rv = vgen_init_multipools(ldcp);
418 	if (rv != 0) {
419 		/*
420 		 * We do not return failure if receive mblk pools can't
421 		 * be allocated; instead allocb(9F) will be used to
422 		 * dynamically allocate buffers during receive.
423 		 */
424 		DWARN(vgenp, ldcp,
425 		    "vnet%d: failed to allocate rx mblk "
426 		    "pools for channel(0x%lx)\n",
427 		    vgenp->instance, ldcp->ldc_id);
428 	}
429 
430 	return (VGEN_SUCCESS);
431 
432 fail:
433 	if (ldcp->rx_dring_handle != 0) {
434 		(void) ldc_mem_dring_unmap(ldcp->rx_dring_handle);
435 		ldcp->rx_dring_handle = 0;
436 	}
437 	return (VGEN_FAILURE);
438 }
439 
440 /*
441  * Unmap the receive descriptor ring.
442  */
443 void
444 vgen_unmap_rx_dring(vgen_ldc_t *ldcp)
445 {
446 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
447 	vio_mblk_pool_t		*vmp = NULL;
448 
449 	/* Destroy receive mblk pools */
450 	vio_destroy_multipools(&ldcp->vmp, &vmp);
451 	if (vmp != NULL) {
452 		/*
453 		 * If we can't destroy the rx pool for this channel,
454 		 * dispatch a task to retry and clean up. Note that we
455 		 * don't need to wait for the task to complete. If the
456 		 * vnet device itself gets detached, it will wait for
457 		 * the task to complete implicitly in
458 		 * ddi_taskq_destroy().
459 		 */
460 		(void) ddi_taskq_dispatch(vgenp->rxp_taskq,
461 		    vgen_destroy_rxpools, vmp, DDI_SLEEP);
462 	}
463 
464 	/* Unmap peer's dring */
465 	if (ldcp->rx_dring_handle != 0) {
466 		(void) ldc_mem_dring_unmap(ldcp->rx_dring_handle);
467 		ldcp->rx_dring_handle = 0;
468 	}
469 
470 	/* clobber rx ring members */
471 	bzero(&ldcp->rx_dring_cookie, sizeof (ldcp->rx_dring_cookie));
472 	ldcp->mrxdp = NULL;
473 	ldcp->next_rxi = 0;
474 	ldcp->num_rxds = 0;
475 	ldcp->next_rxseq = VNET_ISS;
476 }
477 
478 /* Allocate receive resources */
479 static int
480 vgen_init_multipools(vgen_ldc_t *ldcp)
481 {
482 	size_t		data_sz;
483 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
484 	int		status;
485 	uint32_t	sz1 = 0;
486 	uint32_t	sz2 = 0;
487 	uint32_t	sz3 = 0;
488 	uint32_t	sz4 = 0;
489 
490 	/*
491 	 * We round up the mtu specified to be a multiple of 2K.
492 	 * We then create rx pools based on the rounded up size.
493 	 */
494 	data_sz = vgenp->max_frame_size + VNET_IPALIGN + VNET_LDCALIGN;
495 	data_sz = VNET_ROUNDUP_2K(data_sz);
496 
497 	/*
498 	 * If pool sizes are specified, use them. Note that the presence of
499 	 * the first tunable will be used as a hint.
500 	 */
501 	if (vgen_rbufsz1 != 0) {
502 
503 		sz1 = vgen_rbufsz1;
504 		sz2 = vgen_rbufsz2;
505 		sz3 = vgen_rbufsz3;
506 		sz4 = vgen_rbufsz4;
507 
508 		if (sz4 == 0) { /* need 3 pools */
509 
510 			ldcp->max_rxpool_size = sz3;
511 			status = vio_init_multipools(&ldcp->vmp,
512 			    VGEN_NUM_VMPOOLS, sz1, sz2, sz3, vgen_nrbufs1,
513 			    vgen_nrbufs2, vgen_nrbufs3);
514 
515 		} else {
516 
517 			ldcp->max_rxpool_size = sz4;
518 			status = vio_init_multipools(&ldcp->vmp,
519 			    VGEN_NUM_VMPOOLS + 1, sz1, sz2, sz3, sz4,
520 			    vgen_nrbufs1, vgen_nrbufs2, vgen_nrbufs3,
521 			    vgen_nrbufs4);
522 		}
523 		return (status);
524 	}
525 
526 	/*
527 	 * Pool sizes are not specified. We select the pool sizes based on the
528 	 * mtu if vnet_jumbo_rxpools is enabled.
529 	 */
530 	if (vnet_jumbo_rxpools == B_FALSE || data_sz == VNET_2K) {
531 		/*
532 		 * Receive buffer pool allocation based on mtu is disabled.
533 		 * Use the default mechanism of standard size pool allocation.
534 		 */
535 		sz1 = VGEN_DBLK_SZ_128;
536 		sz2 = VGEN_DBLK_SZ_256;
537 		sz3 = VGEN_DBLK_SZ_2048;
538 		ldcp->max_rxpool_size = sz3;
539 
540 		status = vio_init_multipools(&ldcp->vmp, VGEN_NUM_VMPOOLS,
541 		    sz1, sz2, sz3,
542 		    vgen_nrbufs1, vgen_nrbufs2, vgen_nrbufs3);
543 
544 		return (status);
545 	}
546 
547 	switch (data_sz) {
548 
549 	case VNET_4K:
550 
551 		sz1 = VGEN_DBLK_SZ_128;
552 		sz2 = VGEN_DBLK_SZ_256;
553 		sz3 = VGEN_DBLK_SZ_2048;
554 		sz4 = sz3 << 1;			/* 4K */
555 		ldcp->max_rxpool_size = sz4;
556 
557 		status = vio_init_multipools(&ldcp->vmp, VGEN_NUM_VMPOOLS + 1,
558 		    sz1, sz2, sz3, sz4,
559 		    vgen_nrbufs1, vgen_nrbufs2, vgen_nrbufs3, vgen_nrbufs4);
560 		break;
561 
562 	default:	/* data_sz:  4K+ to 16K */
563 
564 		sz1 = VGEN_DBLK_SZ_256;
565 		sz2 = VGEN_DBLK_SZ_2048;
566 		sz3 = data_sz >> 1;	/* Jumbo-size/2 */
567 		sz4 = data_sz;		/* Jumbo-size  */
568 		ldcp->max_rxpool_size = sz4;
569 
570 		status = vio_init_multipools(&ldcp->vmp, VGEN_NUM_VMPOOLS + 1,
571 		    sz1, sz2, sz3, sz4,
572 		    vgen_nrbufs1, vgen_nrbufs2, vgen_nrbufs3, vgen_nrbufs4);
573 		break;
574 
575 	}
576 
577 	return (status);
578 }
579 
580 /*
581  * This function transmits normal data frames (non-priority) over the channel.
582  * It queues the frame into the transmit descriptor ring and sends a
583  * VIO_DRING_DATA message if needed, to wake up the peer to (re)start
584  * processing.
585  */
586 int
587 vgen_dringsend(void *arg, mblk_t *mp)
588 {
589 	vgen_ldc_t		*ldcp = (vgen_ldc_t *)arg;
590 	vgen_private_desc_t	*tbufp;
591 	vgen_private_desc_t	*rtbufp;
592 	vnet_public_desc_t	*rtxdp;
593 	vgen_private_desc_t	*ntbufp;
594 	vnet_public_desc_t	*txdp;
595 	vio_dring_entry_hdr_t	*hdrp;
596 	vgen_stats_t		*statsp;
597 	struct ether_header	*ehp;
598 	boolean_t		is_bcast = B_FALSE;
599 	boolean_t		is_mcast = B_FALSE;
600 	size_t			mblksz;
601 	caddr_t			dst;
602 	mblk_t			*bp;
603 	size_t			size;
604 	int			rv = 0;
605 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
606 	vgen_hparams_t		*lp = &ldcp->local_hparams;
607 
608 	statsp = &ldcp->stats;
609 	size = msgsize(mp);
610 
611 	DBG1(vgenp, ldcp, "enter\n");
612 
613 	if (ldcp->ldc_status != LDC_UP) {
614 		DWARN(vgenp, ldcp, "status(%d), dropping packet\n",
615 		    ldcp->ldc_status);
616 		goto dringsend_exit;
617 	}
618 
619 	/* drop the packet if ldc is not up or handshake is not done */
620 	if (ldcp->hphase != VH_DONE) {
621 		DWARN(vgenp, ldcp, "hphase(%x), dropping packet\n",
622 		    ldcp->hphase);
623 		goto dringsend_exit;
624 	}
625 
626 	if (size > (size_t)lp->mtu) {
627 		DWARN(vgenp, ldcp, "invalid size(%d)\n", size);
628 		goto dringsend_exit;
629 	}
630 	if (size < ETHERMIN)
631 		size = ETHERMIN;
632 
633 	ehp = (struct ether_header *)mp->b_rptr;
634 	is_bcast = IS_BROADCAST(ehp);
635 	is_mcast = IS_MULTICAST(ehp);
636 
637 	mutex_enter(&ldcp->txlock);
638 	/*
639 	 * allocate a descriptor
640 	 */
641 	tbufp = ldcp->next_tbufp;
642 	ntbufp = NEXTTBUF(ldcp, tbufp);
643 	if (ntbufp == ldcp->cur_tbufp) { /* out of tbufs/txds */
644 
645 		mutex_enter(&ldcp->tclock);
646 		/* Try reclaiming now */
647 		vgen_reclaim_dring(ldcp);
648 		ldcp->reclaim_lbolt = ddi_get_lbolt();
649 
650 		if (ntbufp == ldcp->cur_tbufp) {
651 			/* Now we are really out of tbuf/txds */
652 			ldcp->tx_blocked_lbolt = ddi_get_lbolt();
653 			ldcp->tx_blocked = B_TRUE;
654 			mutex_exit(&ldcp->tclock);
655 
656 			statsp->tx_no_desc++;
657 			mutex_exit(&ldcp->txlock);
658 
659 			return (VGEN_TX_NORESOURCES);
660 		}
661 		mutex_exit(&ldcp->tclock);
662 	}
663 	/* update next available tbuf in the ring and update tx index */
664 	ldcp->next_tbufp = ntbufp;
665 	INCR_TXI(ldcp->next_txi, ldcp);
666 
667 	/* Mark the buffer busy before releasing the lock */
668 	tbufp->flags = VGEN_PRIV_DESC_BUSY;
669 	mutex_exit(&ldcp->txlock);
670 
671 	/* copy data into pre-allocated transmit buffer */
672 	dst = tbufp->datap + VNET_IPALIGN;
673 	for (bp = mp; bp != NULL; bp = bp->b_cont) {
674 		mblksz = MBLKL(bp);
675 		bcopy(bp->b_rptr, dst, mblksz);
676 		dst += mblksz;
677 	}
678 
679 	tbufp->datalen = size;
680 
681 	/* initialize the corresponding public descriptor (txd) */
682 	txdp = tbufp->descp;
683 	hdrp = &txdp->hdr;
684 	txdp->nbytes = size;
685 	txdp->ncookies = tbufp->ncookies;
686 	bcopy((tbufp->memcookie), (txdp->memcookie),
687 	    tbufp->ncookies * sizeof (ldc_mem_cookie_t));
688 
689 	mutex_enter(&ldcp->wrlock);
690 	/*
691 	 * If the flags not set to BUSY, it implies that the clobber
692 	 * was done while we were copying the data. In such case,
693 	 * discard the packet and return.
694 	 */
695 	if (tbufp->flags != VGEN_PRIV_DESC_BUSY) {
696 		statsp->oerrors++;
697 		mutex_exit(&ldcp->wrlock);
698 		goto dringsend_exit;
699 	}
700 	hdrp->dstate = VIO_DESC_READY;
701 
702 	/* update stats */
703 	statsp->opackets++;
704 	statsp->obytes += size;
705 	if (is_bcast)
706 		statsp->brdcstxmt++;
707 	else if (is_mcast)
708 		statsp->multixmt++;
709 
710 	/* send dring datamsg to the peer */
711 	if (ldcp->resched_peer) {
712 
713 		rtbufp = &ldcp->tbufp[ldcp->resched_peer_txi];
714 		rtxdp = rtbufp->descp;
715 
716 		if (rtxdp->hdr.dstate == VIO_DESC_READY) {
717 			rv = vgen_send_dringdata(ldcp,
718 			    (uint32_t)ldcp->resched_peer_txi, -1);
719 			if (rv != 0) {
720 				/* error: drop the packet */
721 				DWARN(vgenp, ldcp,
722 				    "failed sending dringdata msg "
723 				    "rv(%d) len(%d)\n", rv, size);
724 				statsp->oerrors++;
725 			} else {
726 				ldcp->resched_peer = B_FALSE;
727 			}
728 
729 		}
730 
731 	}
732 
733 	mutex_exit(&ldcp->wrlock);
734 
735 dringsend_exit:
736 	if (rv == ECONNRESET) {
737 		(void) vgen_handle_evt_reset(ldcp, VGEN_OTHER);
738 	}
739 	freemsg(mp);
740 	DBG1(vgenp, ldcp, "exit\n");
741 	return (VGEN_TX_SUCCESS);
742 }
743 
744 mblk_t *
745 vgen_poll_rcv(vgen_ldc_t *ldcp, int bytes_to_pickup)
746 {
747 	mblk_t	*bp = NULL;
748 	mblk_t	*bpt = NULL;
749 	mblk_t	*mp = NULL;
750 	size_t	mblk_sz = 0;
751 	size_t	sz = 0;
752 	uint_t	count = 0;
753 
754 	mutex_enter(&ldcp->pollq_lock);
755 
756 	bp = ldcp->pollq_headp;
757 	while (bp != NULL) {
758 		/* get the size of this packet */
759 		mblk_sz = msgdsize(bp);
760 
761 		/* if adding this pkt, exceeds the size limit, we are done. */
762 		if (sz + mblk_sz >  bytes_to_pickup) {
763 			break;
764 		}
765 
766 		/* we have room for this packet */
767 		sz += mblk_sz;
768 
769 		/* increment the # of packets being sent up */
770 		count++;
771 
772 		/* track the last processed pkt */
773 		bpt = bp;
774 
775 		/* get the next pkt */
776 		bp = bp->b_next;
777 	}
778 
779 	if (count != 0) {
780 		/*
781 		 * picked up some packets; save the head of pkts to be sent up.
782 		 */
783 		mp = ldcp->pollq_headp;
784 
785 		/* move the pollq_headp to skip over the pkts being sent up */
786 		ldcp->pollq_headp = bp;
787 
788 		/* picked up all pending pkts in the queue; reset tail also */
789 		if (ldcp->pollq_headp == NULL) {
790 			ldcp->pollq_tailp = NULL;
791 		}
792 
793 		/* terminate the tail of pkts to be sent up */
794 		bpt->b_next = NULL;
795 	}
796 
797 	/*
798 	 * We prepend any high priority packets to the chain of packets; note
799 	 * that if we are already at the bytes_to_pickup limit, we might
800 	 * slightly exceed that in such cases. That should be ok, as these pkts
801 	 * are expected to be small in size and arrive at an interval in the
802 	 * the order of a few seconds.
803 	 */
804 	if (ldcp->rx_pktdata == vgen_handle_pkt_data &&
805 	    ldcp->rx_pri_head != NULL) {
806 		ldcp->rx_pri_tail->b_next = mp;
807 		mp = ldcp->rx_pri_head;
808 		ldcp->rx_pri_head = ldcp->rx_pri_tail = NULL;
809 	}
810 
811 	mutex_exit(&ldcp->pollq_lock);
812 
813 	return (mp);
814 }
815 
816 /*
817  * Process dring data messages (info/ack/nack)
818  */
819 int
820 vgen_handle_dringdata(void *arg1, void *arg2)
821 {
822 	vgen_ldc_t	*ldcp = (vgen_ldc_t *)arg1;
823 	vio_msg_tag_t	*tagp = (vio_msg_tag_t *)arg2;
824 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
825 	int		rv = 0;
826 
827 	DBG1(vgenp, ldcp, "enter\n");
828 	switch (tagp->vio_subtype) {
829 
830 	case VIO_SUBTYPE_INFO:
831 		/*
832 		 * To reduce the locking contention, release the
833 		 * cblock here and re-acquire it once we are done
834 		 * receiving packets.
835 		 */
836 		mutex_exit(&ldcp->cblock);
837 		mutex_enter(&ldcp->rxlock);
838 		rv = vgen_handle_dringdata_info(ldcp, tagp);
839 		mutex_exit(&ldcp->rxlock);
840 		mutex_enter(&ldcp->cblock);
841 		break;
842 
843 	case VIO_SUBTYPE_ACK:
844 		rv = vgen_handle_dringdata_ack(ldcp, tagp);
845 		break;
846 
847 	case VIO_SUBTYPE_NACK:
848 		rv = vgen_handle_dringdata_nack(ldcp, tagp);
849 		break;
850 	}
851 	DBG1(vgenp, ldcp, "exit rv(%d)\n", rv);
852 	return (rv);
853 }
854 
855 static int
856 vgen_handle_dringdata_info(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp)
857 {
858 	uint32_t	start;
859 	int32_t		end;
860 	int		rv = 0;
861 	vio_dring_msg_t	*dringmsg = (vio_dring_msg_t *)tagp;
862 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
863 	vgen_stats_t	*statsp = &ldcp->stats;
864 #ifdef VGEN_HANDLE_LOST_PKTS
865 	uint32_t	rxi;
866 	int		n;
867 #endif
868 
869 	DBG1(vgenp, ldcp, "enter\n");
870 
871 	start = dringmsg->start_idx;
872 	end = dringmsg->end_idx;
873 	/*
874 	 * received a data msg, which contains the start and end
875 	 * indices of the descriptors within the rx ring holding data,
876 	 * the seq_num of data packet corresponding to the start index,
877 	 * and the dring_ident.
878 	 * We can now read the contents of each of these descriptors
879 	 * and gather data from it.
880 	 */
881 	DBG1(vgenp, ldcp, "INFO: start(%d), end(%d)\n",
882 	    start, end);
883 
884 	/* validate rx start and end indexes */
885 	if (!(CHECK_RXI(start, ldcp)) || ((end != -1) &&
886 	    !(CHECK_RXI(end, ldcp)))) {
887 		DWARN(vgenp, ldcp, "Invalid Rx start(%d) or end(%d)\n",
888 		    start, end);
889 		/* drop the message if invalid index */
890 		return (rv);
891 	}
892 
893 	/* validate dring_ident */
894 	if (dringmsg->dring_ident != ldcp->peer_hparams.dring_ident) {
895 		DWARN(vgenp, ldcp, "Invalid dring ident 0x%x\n",
896 		    dringmsg->dring_ident);
897 		/* invalid dring_ident, drop the msg */
898 		return (rv);
899 	}
900 #ifdef DEBUG
901 	if (vgen_inject_error(ldcp, VGEN_ERR_RXLOST)) {
902 		/* drop this msg to simulate lost pkts for debugging */
903 		vgen_inject_err_flag &= ~(VGEN_ERR_RXLOST);
904 		return (rv);
905 	}
906 #endif
907 
908 	statsp->dring_data_msgs_rcvd++;
909 
910 #ifdef	VGEN_HANDLE_LOST_PKTS
911 
912 	/* receive start index doesn't match expected index */
913 	if (ldcp->next_rxi != start) {
914 		DWARN(vgenp, ldcp, "next_rxi(%d) != start(%d)\n",
915 		    ldcp->next_rxi, start);
916 
917 		/* calculate the number of pkts lost */
918 		if (start >= ldcp->next_rxi) {
919 			n = start - ldcp->next_rxi;
920 		} else  {
921 			n = ldcp->num_rxds - (ldcp->next_rxi - start);
922 		}
923 
924 		statsp->rx_lost_pkts += n;
925 		tagp->vio_subtype = VIO_SUBTYPE_NACK;
926 		tagp->vio_sid = ldcp->local_sid;
927 		/* indicate the range of lost descriptors */
928 		dringmsg->start_idx = ldcp->next_rxi;
929 		rxi = start;
930 		DECR_RXI(rxi, ldcp);
931 		dringmsg->end_idx = rxi;
932 		/* dring ident is left unchanged */
933 		rv = vgen_sendmsg(ldcp, (caddr_t)tagp,
934 		    sizeof (*dringmsg), B_FALSE);
935 		if (rv != VGEN_SUCCESS) {
936 			DWARN(vgenp, ldcp,
937 			    "vgen_sendmsg failed, stype:NACK\n");
938 			return (rv);
939 		}
940 		/*
941 		 * treat this range of descrs/pkts as dropped
942 		 * and set the new expected value of next_rxi
943 		 * and continue(below) to process from the new
944 		 * start index.
945 		 */
946 		ldcp->next_rxi = start;
947 	}
948 
949 #endif	/* VGEN_HANDLE_LOST_PKTS */
950 
951 	/* Now receive messages */
952 	rv = vgen_process_dringdata(ldcp, tagp);
953 
954 	DBG1(vgenp, ldcp, "exit rv(%d)\n", rv);
955 	return (rv);
956 }
957 
958 static int
959 vgen_process_dringdata(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp)
960 {
961 	boolean_t		set_ack_start = B_FALSE;
962 	uint32_t		start;
963 	uint32_t		ack_end;
964 	uint32_t		next_rxi;
965 	uint32_t		rxi;
966 	int			count = 0;
967 	int			rv = 0;
968 	uint32_t		retries = 0;
969 	vgen_stats_t		*statsp;
970 	vnet_public_desc_t	rxd;
971 	vio_dring_entry_hdr_t	*hdrp;
972 	mblk_t 			*bp = NULL;
973 	mblk_t 			*bpt = NULL;
974 	uint32_t		ack_start;
975 	boolean_t		rxd_err = B_FALSE;
976 	mblk_t			*mp = NULL;
977 	vio_mblk_t		*vmp = NULL;
978 	size_t			nbytes;
979 	boolean_t		ack_needed = B_FALSE;
980 	size_t			nread;
981 	uint64_t		off = 0;
982 	struct ether_header	*ehp;
983 	vio_dring_msg_t		*dringmsg = (vio_dring_msg_t *)tagp;
984 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
985 	vgen_hparams_t		*lp = &ldcp->local_hparams;
986 
987 	DBG1(vgenp, ldcp, "enter\n");
988 
989 	statsp = &ldcp->stats;
990 	start = dringmsg->start_idx;
991 
992 	/*
993 	 * start processing the descriptors from the specified
994 	 * start index, up to the index a descriptor is not ready
995 	 * to be processed or we process the entire descriptor ring
996 	 * and wrap around upto the start index.
997 	 */
998 
999 	/* need to set the start index of descriptors to be ack'd */
1000 	set_ack_start = B_TRUE;
1001 
1002 	/* index upto which we have ack'd */
1003 	ack_end = start;
1004 	DECR_RXI(ack_end, ldcp);
1005 
1006 	next_rxi = rxi =  start;
1007 	do {
1008 vgen_recv_retry:
1009 		rv = vnet_dring_entry_copy(&(ldcp->mrxdp[rxi]), &rxd,
1010 		    ldcp->dring_mtype, ldcp->rx_dring_handle, rxi, rxi);
1011 		if (rv != 0) {
1012 			DWARN(vgenp, ldcp, "ldc_mem_dring_acquire() failed"
1013 			    " rv(%d)\n", rv);
1014 			statsp->ierrors++;
1015 			return (rv);
1016 		}
1017 
1018 		hdrp = &rxd.hdr;
1019 
1020 		if (hdrp->dstate != VIO_DESC_READY) {
1021 			/*
1022 			 * Before waiting and retry here, send up
1023 			 * the packets that are received already
1024 			 */
1025 			if (bp != NULL) {
1026 				DTRACE_PROBE1(vgen_rcv_msgs, int, count);
1027 				vgen_rx(ldcp, bp, bpt);
1028 				count = 0;
1029 				bp = bpt = NULL;
1030 			}
1031 			/*
1032 			 * descriptor is not ready.
1033 			 * retry descriptor acquire, stop processing
1034 			 * after max # retries.
1035 			 */
1036 			if (retries == vgen_recv_retries)
1037 				break;
1038 			retries++;
1039 			drv_usecwait(vgen_recv_delay);
1040 			goto vgen_recv_retry;
1041 		}
1042 		retries = 0;
1043 
1044 		if (set_ack_start) {
1045 			/*
1046 			 * initialize the start index of the range
1047 			 * of descriptors to be ack'd.
1048 			 */
1049 			ack_start = rxi;
1050 			set_ack_start = B_FALSE;
1051 		}
1052 
1053 		if ((rxd.nbytes < ETHERMIN) ||
1054 		    (rxd.nbytes > lp->mtu) ||
1055 		    (rxd.ncookies == 0) ||
1056 		    (rxd.ncookies > MAX_COOKIES)) {
1057 			rxd_err = B_TRUE;
1058 		} else {
1059 			/*
1060 			 * Try to allocate an mblk from the free pool
1061 			 * of recv mblks for the channel.
1062 			 * If this fails, use allocb().
1063 			 */
1064 			nbytes = (VNET_IPALIGN + rxd.nbytes + 7) & ~7;
1065 			if (nbytes > ldcp->max_rxpool_size) {
1066 				mp = allocb(VNET_IPALIGN + rxd.nbytes + 8,
1067 				    BPRI_MED);
1068 				vmp = NULL;
1069 			} else {
1070 				vmp = vio_multipool_allocb(&ldcp->vmp, nbytes);
1071 				if (vmp == NULL) {
1072 					statsp->rx_vio_allocb_fail++;
1073 					/*
1074 					 * Data buffer returned by allocb(9F)
1075 					 * is 8byte aligned. We allocate extra
1076 					 * 8 bytes to ensure size is multiple
1077 					 * of 8 bytes for ldc_mem_copy().
1078 					 */
1079 					mp = allocb(VNET_IPALIGN +
1080 					    rxd.nbytes + 8, BPRI_MED);
1081 				} else {
1082 					mp = vmp->mp;
1083 				}
1084 			}
1085 		}
1086 		if ((rxd_err) || (mp == NULL)) {
1087 			/*
1088 			 * rxd_err or allocb() failure,
1089 			 * drop this packet, get next.
1090 			 */
1091 			if (rxd_err) {
1092 				statsp->ierrors++;
1093 				rxd_err = B_FALSE;
1094 			} else {
1095 				statsp->rx_allocb_fail++;
1096 			}
1097 
1098 			ack_needed = hdrp->ack;
1099 
1100 			/* set descriptor done bit */
1101 			rv = vnet_dring_entry_set_dstate(&(ldcp->mrxdp[rxi]),
1102 			    ldcp->dring_mtype, ldcp->rx_dring_handle, rxi, rxi,
1103 			    VIO_DESC_DONE);
1104 			if (rv != 0) {
1105 				DWARN(vgenp, ldcp,
1106 				    "vnet_dring_entry_set_dstate err rv(%d)\n",
1107 				    rv);
1108 				return (rv);
1109 			}
1110 
1111 			if (ack_needed) {
1112 				ack_needed = B_FALSE;
1113 				/*
1114 				 * sender needs ack for this packet,
1115 				 * ack pkts upto this index.
1116 				 */
1117 				ack_end = rxi;
1118 
1119 				rv = vgen_send_dringack(ldcp, tagp,
1120 				    ack_start, ack_end,
1121 				    VIO_DP_ACTIVE);
1122 				if (rv != VGEN_SUCCESS) {
1123 					goto error_ret;
1124 				}
1125 
1126 				/* need to set new ack start index */
1127 				set_ack_start = B_TRUE;
1128 			}
1129 			goto vgen_next_rxi;
1130 		}
1131 
1132 		nread = nbytes;
1133 		rv = ldc_mem_copy(ldcp->ldc_handle,
1134 		    (caddr_t)mp->b_rptr, off, &nread,
1135 		    rxd.memcookie, rxd.ncookies, LDC_COPY_IN);
1136 
1137 		/* if ldc_mem_copy() failed */
1138 		if (rv) {
1139 			DWARN(vgenp, ldcp, "ldc_mem_copy err rv(%d)\n", rv);
1140 			statsp->ierrors++;
1141 			freemsg(mp);
1142 			goto error_ret;
1143 		}
1144 
1145 		ack_needed = hdrp->ack;
1146 
1147 		rv = vnet_dring_entry_set_dstate(&(ldcp->mrxdp[rxi]),
1148 		    ldcp->dring_mtype, ldcp->rx_dring_handle, rxi, rxi,
1149 		    VIO_DESC_DONE);
1150 		if (rv != 0) {
1151 			DWARN(vgenp, ldcp,
1152 			    "vnet_dring_entry_set_dstate err rv(%d)\n", rv);
1153 			freemsg(mp);
1154 			goto error_ret;
1155 		}
1156 
1157 		mp->b_rptr += VNET_IPALIGN;
1158 
1159 		if (ack_needed) {
1160 			ack_needed = B_FALSE;
1161 			/*
1162 			 * sender needs ack for this packet,
1163 			 * ack pkts upto this index.
1164 			 */
1165 			ack_end = rxi;
1166 
1167 			rv = vgen_send_dringack(ldcp, tagp,
1168 			    ack_start, ack_end, VIO_DP_ACTIVE);
1169 			if (rv != VGEN_SUCCESS) {
1170 				freemsg(mp);
1171 				goto error_ret;
1172 			}
1173 
1174 			/* need to set new ack start index */
1175 			set_ack_start = B_TRUE;
1176 		}
1177 
1178 		if (nread != nbytes) {
1179 			DWARN(vgenp, ldcp,
1180 			    "ldc_mem_copy nread(%lx), nbytes(%lx)\n",
1181 			    nread, nbytes);
1182 			statsp->ierrors++;
1183 			freemsg(mp);
1184 			goto vgen_next_rxi;
1185 		}
1186 
1187 		/* point to the actual end of data */
1188 		mp->b_wptr = mp->b_rptr + rxd.nbytes;
1189 
1190 		if (vmp != NULL) {
1191 			vmp->state = VIO_MBLK_HAS_DATA;
1192 		}
1193 
1194 		/* update stats */
1195 		statsp->ipackets++;
1196 		statsp->rbytes += rxd.nbytes;
1197 		ehp = (struct ether_header *)mp->b_rptr;
1198 		if (IS_BROADCAST(ehp))
1199 			statsp->brdcstrcv++;
1200 		else if (IS_MULTICAST(ehp))
1201 			statsp->multircv++;
1202 
1203 		/* build a chain of received packets */
1204 		if (bp == NULL) {
1205 			/* first pkt */
1206 			bp = mp;
1207 			bpt = bp;
1208 			bpt->b_next = NULL;
1209 		} else {
1210 			mp->b_next = NULL;
1211 			bpt->b_next = mp;
1212 			bpt = mp;
1213 		}
1214 
1215 		if (count++ > vgen_chain_len) {
1216 			DTRACE_PROBE1(vgen_rcv_msgs, int, count);
1217 			vgen_rx(ldcp, bp, bpt);
1218 			count = 0;
1219 			bp = bpt = NULL;
1220 		}
1221 
1222 vgen_next_rxi:
1223 		/* update end index of range of descrs to be ack'd */
1224 		ack_end = rxi;
1225 
1226 		/* update the next index to be processed */
1227 		INCR_RXI(next_rxi, ldcp);
1228 		if (next_rxi == start) {
1229 			/*
1230 			 * processed the entire descriptor ring upto
1231 			 * the index at which we started.
1232 			 */
1233 			break;
1234 		}
1235 
1236 		rxi = next_rxi;
1237 
1238 	_NOTE(CONSTCOND)
1239 	} while (1);
1240 
1241 	/*
1242 	 * send an ack message to peer indicating that we have stopped
1243 	 * processing descriptors.
1244 	 */
1245 	if (set_ack_start) {
1246 		/*
1247 		 * We have ack'd upto some index and we have not
1248 		 * processed any descriptors beyond that index.
1249 		 * Use the last ack'd index as both the start and
1250 		 * end of range of descrs being ack'd.
1251 		 * Note: This results in acking the last index twice
1252 		 * and should be harmless.
1253 		 */
1254 		ack_start = ack_end;
1255 	}
1256 
1257 	rv = vgen_send_dringack(ldcp, tagp, ack_start, ack_end,
1258 	    VIO_DP_STOPPED);
1259 	if (rv != VGEN_SUCCESS) {
1260 		goto error_ret;
1261 	}
1262 
1263 	/* save new recv index of next dring msg */
1264 	ldcp->next_rxi = next_rxi;
1265 
1266 error_ret:
1267 	/* send up packets received so far */
1268 	if (bp != NULL) {
1269 		DTRACE_PROBE1(vgen_rcv_msgs, int, count);
1270 		vgen_rx(ldcp, bp, bpt);
1271 		bp = bpt = NULL;
1272 	}
1273 	DBG1(vgenp, ldcp, "exit rv(%d)\n", rv);
1274 	return (rv);
1275 
1276 }
1277 
1278 static int
1279 vgen_handle_dringdata_ack(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp)
1280 {
1281 	int			rv = 0;
1282 	uint32_t		start;
1283 	int32_t			end;
1284 	uint32_t		txi;
1285 	boolean_t		ready_txd = B_FALSE;
1286 	vgen_stats_t		*statsp;
1287 	vgen_private_desc_t	*tbufp;
1288 	vnet_public_desc_t	*txdp;
1289 	vio_dring_entry_hdr_t	*hdrp;
1290 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
1291 	vio_dring_msg_t		*dringmsg = (vio_dring_msg_t *)tagp;
1292 
1293 	DBG1(vgenp, ldcp, "enter\n");
1294 	start = dringmsg->start_idx;
1295 	end = dringmsg->end_idx;
1296 	statsp = &ldcp->stats;
1297 
1298 	/*
1299 	 * received an ack corresponding to a specific descriptor for
1300 	 * which we had set the ACK bit in the descriptor (during
1301 	 * transmit). This enables us to reclaim descriptors.
1302 	 */
1303 
1304 	DBG2(vgenp, ldcp, "ACK:  start(%d), end(%d)\n", start, end);
1305 
1306 	/* validate start and end indexes in the tx ack msg */
1307 	if (!(CHECK_TXI(start, ldcp)) || !(CHECK_TXI(end, ldcp))) {
1308 		/* drop the message if invalid index */
1309 		DWARN(vgenp, ldcp, "Invalid Tx ack start(%d) or end(%d)\n",
1310 		    start, end);
1311 		return (rv);
1312 	}
1313 	/* validate dring_ident */
1314 	if (dringmsg->dring_ident != ldcp->local_hparams.dring_ident) {
1315 		/* invalid dring_ident, drop the msg */
1316 		DWARN(vgenp, ldcp, "Invalid dring ident 0x%x\n",
1317 		    dringmsg->dring_ident);
1318 		return (rv);
1319 	}
1320 	statsp->dring_data_acks_rcvd++;
1321 
1322 	/* reclaim descriptors that are done */
1323 	vgen_reclaim(ldcp);
1324 
1325 	if (dringmsg->dring_process_state != VIO_DP_STOPPED) {
1326 		/*
1327 		 * receiver continued processing descriptors after
1328 		 * sending us the ack.
1329 		 */
1330 		return (rv);
1331 	}
1332 
1333 	statsp->dring_stopped_acks_rcvd++;
1334 
1335 	/* receiver stopped processing descriptors */
1336 	mutex_enter(&ldcp->wrlock);
1337 	mutex_enter(&ldcp->tclock);
1338 
1339 	/*
1340 	 * determine if there are any pending tx descriptors
1341 	 * ready to be processed by the receiver(peer) and if so,
1342 	 * send a message to the peer to restart receiving.
1343 	 */
1344 	ready_txd = B_FALSE;
1345 
1346 	/*
1347 	 * using the end index of the descriptor range for which
1348 	 * we received the ack, check if the next descriptor is
1349 	 * ready.
1350 	 */
1351 	txi = end;
1352 	INCR_TXI(txi, ldcp);
1353 	tbufp = &ldcp->tbufp[txi];
1354 	txdp = tbufp->descp;
1355 	hdrp = &txdp->hdr;
1356 	if (hdrp->dstate == VIO_DESC_READY) {
1357 		ready_txd = B_TRUE;
1358 	} else {
1359 		/*
1360 		 * descr next to the end of ack'd descr range is not
1361 		 * ready.
1362 		 * starting from the current reclaim index, check
1363 		 * if any descriptor is ready.
1364 		 */
1365 
1366 		txi = ldcp->cur_tbufp - ldcp->tbufp;
1367 		tbufp = &ldcp->tbufp[txi];
1368 
1369 		txdp = tbufp->descp;
1370 		hdrp = &txdp->hdr;
1371 		if (hdrp->dstate == VIO_DESC_READY) {
1372 			ready_txd = B_TRUE;
1373 		}
1374 
1375 	}
1376 
1377 	if (ready_txd) {
1378 		/*
1379 		 * we have tx descriptor(s) ready to be
1380 		 * processed by the receiver.
1381 		 * send a message to the peer with the start index
1382 		 * of ready descriptors.
1383 		 */
1384 		rv = vgen_send_dringdata(ldcp, txi, -1);
1385 		if (rv != VGEN_SUCCESS) {
1386 			ldcp->resched_peer = B_TRUE;
1387 			ldcp->resched_peer_txi = txi;
1388 			mutex_exit(&ldcp->tclock);
1389 			mutex_exit(&ldcp->wrlock);
1390 			return (rv);
1391 		}
1392 	} else {
1393 		/*
1394 		 * no ready tx descriptors. set the flag to send a
1395 		 * message to peer when tx descriptors are ready in
1396 		 * transmit routine.
1397 		 */
1398 		ldcp->resched_peer = B_TRUE;
1399 		ldcp->resched_peer_txi = ldcp->cur_tbufp - ldcp->tbufp;
1400 	}
1401 
1402 	mutex_exit(&ldcp->tclock);
1403 	mutex_exit(&ldcp->wrlock);
1404 	DBG1(vgenp, ldcp, "exit rv(%d)\n", rv);
1405 	return (rv);
1406 }
1407 
1408 static int
1409 vgen_handle_dringdata_nack(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp)
1410 {
1411 	int			rv = 0;
1412 	uint32_t		start;
1413 	int32_t			end;
1414 	uint32_t		txi;
1415 	vnet_public_desc_t	*txdp;
1416 	vio_dring_entry_hdr_t	*hdrp;
1417 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
1418 	vio_dring_msg_t		*dringmsg = (vio_dring_msg_t *)tagp;
1419 
1420 	DBG1(vgenp, ldcp, "enter\n");
1421 	start = dringmsg->start_idx;
1422 	end = dringmsg->end_idx;
1423 
1424 	/*
1425 	 * peer sent a NACK msg to indicate lost packets.
1426 	 * The start and end correspond to the range of descriptors
1427 	 * for which the peer didn't receive a dring data msg and so
1428 	 * didn't receive the corresponding data.
1429 	 */
1430 	DWARN(vgenp, ldcp, "NACK: start(%d), end(%d)\n", start, end);
1431 
1432 	/* validate start and end indexes in the tx nack msg */
1433 	if (!(CHECK_TXI(start, ldcp)) || !(CHECK_TXI(end, ldcp))) {
1434 		/* drop the message if invalid index */
1435 		DWARN(vgenp, ldcp, "Invalid Tx nack start(%d) or end(%d)\n",
1436 		    start, end);
1437 		return (rv);
1438 	}
1439 	/* validate dring_ident */
1440 	if (dringmsg->dring_ident != ldcp->local_hparams.dring_ident) {
1441 		/* invalid dring_ident, drop the msg */
1442 		DWARN(vgenp, ldcp, "Invalid dring ident 0x%x\n",
1443 		    dringmsg->dring_ident);
1444 		return (rv);
1445 	}
1446 	mutex_enter(&ldcp->txlock);
1447 	mutex_enter(&ldcp->tclock);
1448 
1449 	if (ldcp->next_tbufp == ldcp->cur_tbufp) {
1450 		/* no busy descriptors, bogus nack ? */
1451 		mutex_exit(&ldcp->tclock);
1452 		mutex_exit(&ldcp->txlock);
1453 		return (rv);
1454 	}
1455 
1456 	/* we just mark the descrs as done so they can be reclaimed */
1457 	for (txi = start; txi <= end; ) {
1458 		txdp = &(ldcp->txdp[txi]);
1459 		hdrp = &txdp->hdr;
1460 		if (hdrp->dstate == VIO_DESC_READY)
1461 			hdrp->dstate = VIO_DESC_DONE;
1462 		INCR_TXI(txi, ldcp);
1463 	}
1464 	mutex_exit(&ldcp->tclock);
1465 	mutex_exit(&ldcp->txlock);
1466 	DBG1(vgenp, ldcp, "exit rv(%d)\n", rv);
1467 	return (rv);
1468 }
1469 
1470 /*
1471  * Send received packets up the stack.
1472  */
1473 static void
1474 vgen_rx(vgen_ldc_t *ldcp, mblk_t *bp, mblk_t *bpt)
1475 {
1476 	vio_net_rx_cb_t vrx_cb = ldcp->portp->vcb.vio_net_rx_cb;
1477 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
1478 
1479 	if (ldcp->msg_thread != NULL) {
1480 		ASSERT(MUTEX_HELD(&ldcp->rxlock));
1481 	} else {
1482 		ASSERT(MUTEX_HELD(&ldcp->cblock));
1483 	}
1484 
1485 	mutex_enter(&ldcp->pollq_lock);
1486 
1487 	if (ldcp->polling_on == B_TRUE) {
1488 		/*
1489 		 * If we are in polling mode, simply queue
1490 		 * the packets onto the poll queue and return.
1491 		 */
1492 		if (ldcp->pollq_headp == NULL) {
1493 			ldcp->pollq_headp = bp;
1494 			ldcp->pollq_tailp = bpt;
1495 		} else {
1496 			ldcp->pollq_tailp->b_next = bp;
1497 			ldcp->pollq_tailp = bpt;
1498 		}
1499 
1500 		mutex_exit(&ldcp->pollq_lock);
1501 		return;
1502 	}
1503 
1504 	/*
1505 	 * Prepend any pending mblks in the poll queue, now that we
1506 	 * are in interrupt mode, before sending up the chain of pkts.
1507 	 */
1508 	if (ldcp->pollq_headp != NULL) {
1509 		DBG2(vgenp, ldcp, "vgen_rx(%lx), pending pollq_headp\n",
1510 		    (uintptr_t)ldcp);
1511 		ldcp->pollq_tailp->b_next = bp;
1512 		bp = ldcp->pollq_headp;
1513 		ldcp->pollq_headp = ldcp->pollq_tailp = NULL;
1514 	}
1515 
1516 	mutex_exit(&ldcp->pollq_lock);
1517 
1518 	if (ldcp->msg_thread != NULL) {
1519 		mutex_exit(&ldcp->rxlock);
1520 	} else {
1521 		mutex_exit(&ldcp->cblock);
1522 	}
1523 
1524 	/* Send up the packets */
1525 	vrx_cb(ldcp->portp->vhp, bp);
1526 
1527 	if (ldcp->msg_thread != NULL) {
1528 		mutex_enter(&ldcp->rxlock);
1529 	} else {
1530 		mutex_enter(&ldcp->cblock);
1531 	}
1532 }
1533 
1534 static void
1535 vgen_reclaim(vgen_ldc_t *ldcp)
1536 {
1537 	mutex_enter(&ldcp->tclock);
1538 	vgen_reclaim_dring(ldcp);
1539 	ldcp->reclaim_lbolt = ddi_get_lbolt();
1540 	mutex_exit(&ldcp->tclock);
1541 }
1542 
1543 /*
1544  * transmit reclaim function. starting from the current reclaim index
1545  * look for descriptors marked DONE and reclaim the descriptor.
1546  */
1547 static void
1548 vgen_reclaim_dring(vgen_ldc_t *ldcp)
1549 {
1550 	int			count = 0;
1551 	vnet_public_desc_t	*txdp;
1552 	vgen_private_desc_t	*tbufp;
1553 	vio_dring_entry_hdr_t	*hdrp;
1554 
1555 	tbufp = ldcp->cur_tbufp;
1556 	txdp = tbufp->descp;
1557 	hdrp = &txdp->hdr;
1558 
1559 	while ((hdrp->dstate == VIO_DESC_DONE) &&
1560 	    (tbufp != ldcp->next_tbufp)) {
1561 		tbufp->flags = VGEN_PRIV_DESC_FREE;
1562 		hdrp->dstate = VIO_DESC_FREE;
1563 		hdrp->ack = B_FALSE;
1564 
1565 		tbufp = NEXTTBUF(ldcp, tbufp);
1566 		txdp = tbufp->descp;
1567 		hdrp = &txdp->hdr;
1568 		count++;
1569 	}
1570 
1571 	ldcp->cur_tbufp = tbufp;
1572 
1573 	/*
1574 	 * Check if mac layer should be notified to restart transmissions
1575 	 */
1576 	if ((ldcp->tx_blocked) && (count > 0)) {
1577 		vio_net_tx_update_t vtx_update =
1578 		    ldcp->portp->vcb.vio_net_tx_update;
1579 
1580 		ldcp->tx_blocked = B_FALSE;
1581 		vtx_update(ldcp->portp->vhp);
1582 	}
1583 }
1584 
1585 /*
1586  * Send descriptor ring data message to the peer over ldc.
1587  */
1588 static int
1589 vgen_send_dringdata(vgen_ldc_t *ldcp, uint32_t start, int32_t end)
1590 {
1591 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
1592 	vio_dring_msg_t	dringmsg, *msgp = &dringmsg;
1593 	vio_msg_tag_t	*tagp = &msgp->tag;
1594 	vgen_stats_t	*statsp = &ldcp->stats;
1595 	int		rv;
1596 
1597 #ifdef DEBUG
1598 	if (vgen_inject_error(ldcp, VGEN_ERR_TXTIMEOUT)) {
1599 		return (VGEN_SUCCESS);
1600 	}
1601 #endif
1602 	bzero(msgp, sizeof (*msgp));
1603 
1604 	tagp->vio_msgtype = VIO_TYPE_DATA;
1605 	tagp->vio_subtype = VIO_SUBTYPE_INFO;
1606 	tagp->vio_subtype_env = VIO_DRING_DATA;
1607 	tagp->vio_sid = ldcp->local_sid;
1608 
1609 	msgp->dring_ident = ldcp->local_hparams.dring_ident;
1610 	msgp->start_idx = start;
1611 	msgp->end_idx = end;
1612 
1613 	rv = vgen_sendmsg(ldcp, (caddr_t)tagp, sizeof (dringmsg), B_TRUE);
1614 	if (rv != VGEN_SUCCESS) {
1615 		DWARN(vgenp, ldcp, "vgen_sendmsg failed\n");
1616 		return (rv);
1617 	}
1618 
1619 	statsp->dring_data_msgs_sent++;
1620 
1621 	DBG2(vgenp, ldcp, "DRING_DATA_SENT \n");
1622 
1623 	return (VGEN_SUCCESS);
1624 }
1625 
1626 /*
1627  * Send dring data ack message.
1628  */
1629 static int
1630 vgen_send_dringack(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp, uint32_t start,
1631     int32_t end, uint8_t pstate)
1632 {
1633 	int		rv = 0;
1634 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
1635 	vio_dring_msg_t	*msgp = (vio_dring_msg_t *)tagp;
1636 	vgen_stats_t	*statsp = &ldcp->stats;
1637 
1638 	tagp->vio_msgtype = VIO_TYPE_DATA;
1639 	tagp->vio_subtype = VIO_SUBTYPE_ACK;
1640 	tagp->vio_subtype_env = VIO_DRING_DATA;
1641 	tagp->vio_sid = ldcp->local_sid;
1642 	msgp->start_idx = start;
1643 	msgp->end_idx = end;
1644 	msgp->dring_process_state = pstate;
1645 
1646 	rv = vgen_sendmsg(ldcp, (caddr_t)tagp, sizeof (*msgp), B_FALSE);
1647 	if (rv != VGEN_SUCCESS) {
1648 		DWARN(vgenp, ldcp, "vgen_sendmsg() failed\n");
1649 	}
1650 
1651 	statsp->dring_data_acks_sent++;
1652 	if (pstate == VIO_DP_STOPPED) {
1653 		statsp->dring_stopped_acks_sent++;
1654 	}
1655 
1656 	return (rv);
1657 }
1658 
1659 /*
1660  * Wrapper routine to send the given message over ldc using ldc_write().
1661  */
1662 int
1663 vgen_sendmsg(vgen_ldc_t *ldcp, caddr_t msg,  size_t msglen,
1664     boolean_t caller_holds_lock)
1665 {
1666 	int			rv;
1667 	size_t			len;
1668 	uint32_t		retries = 0;
1669 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
1670 	vio_msg_tag_t		*tagp = (vio_msg_tag_t *)msg;
1671 	vio_dring_msg_t		*dmsg;
1672 	vio_raw_data_msg_t	*rmsg;
1673 	boolean_t		data_msg = B_FALSE;
1674 
1675 	len = msglen;
1676 	if ((len == 0) || (msg == NULL))
1677 		return (VGEN_FAILURE);
1678 
1679 	if (!caller_holds_lock) {
1680 		mutex_enter(&ldcp->wrlock);
1681 	}
1682 
1683 	if (tagp->vio_subtype == VIO_SUBTYPE_INFO) {
1684 		if (tagp->vio_subtype_env == VIO_DRING_DATA) {
1685 			dmsg = (vio_dring_msg_t *)tagp;
1686 			dmsg->seq_num = ldcp->next_txseq;
1687 			data_msg = B_TRUE;
1688 		} else if (tagp->vio_subtype_env == VIO_PKT_DATA) {
1689 			rmsg = (vio_raw_data_msg_t *)tagp;
1690 			rmsg->seq_num = ldcp->next_txseq;
1691 			data_msg = B_TRUE;
1692 		}
1693 	}
1694 
1695 	do {
1696 		len = msglen;
1697 		rv = ldc_write(ldcp->ldc_handle, (caddr_t)msg, &len);
1698 		if (retries++ >= vgen_ldcwr_retries)
1699 			break;
1700 	} while (rv == EWOULDBLOCK);
1701 
1702 	if (rv == 0 && data_msg == B_TRUE) {
1703 		ldcp->next_txseq++;
1704 	}
1705 
1706 	if (!caller_holds_lock) {
1707 		mutex_exit(&ldcp->wrlock);
1708 	}
1709 
1710 	if (rv != 0) {
1711 		DWARN(vgenp, ldcp, "ldc_write failed: rv(%d)\n",
1712 		    rv, msglen);
1713 		return (rv);
1714 	}
1715 
1716 	if (len != msglen) {
1717 		DWARN(vgenp, ldcp, "ldc_write failed: rv(%d) msglen (%d)\n",
1718 		    rv, msglen);
1719 		return (VGEN_FAILURE);
1720 	}
1721 
1722 	return (VGEN_SUCCESS);
1723 }
1724 
1725 int
1726 vgen_check_datamsg_seq(vgen_ldc_t *ldcp, vio_msg_tag_t *tagp)
1727 {
1728 	vio_raw_data_msg_t	*rmsg;
1729 	vio_dring_msg_t		*dmsg;
1730 	uint64_t		seq_num;
1731 	vgen_t			*vgenp = LDC_TO_VGEN(ldcp);
1732 
1733 	if (tagp->vio_subtype_env == VIO_DRING_DATA) {
1734 		dmsg = (vio_dring_msg_t *)tagp;
1735 		seq_num = dmsg->seq_num;
1736 	} else if (tagp->vio_subtype_env == VIO_PKT_DATA) {
1737 		rmsg = (vio_raw_data_msg_t *)tagp;
1738 		seq_num = rmsg->seq_num;
1739 	} else {
1740 		return (EINVAL);
1741 	}
1742 
1743 	if (seq_num != ldcp->next_rxseq) {
1744 
1745 		/* seqnums don't match */
1746 		DWARN(vgenp, ldcp,
1747 		    "next_rxseq(0x%lx) != seq_num(0x%lx)\n",
1748 		    ldcp->next_rxseq, seq_num);
1749 		return (EINVAL);
1750 
1751 	}
1752 
1753 	ldcp->next_rxseq++;
1754 
1755 	return (0);
1756 }
1757 
1758 /*
1759  * vgen_ldc_msg_worker -- A per LDC worker thread. This thread is woken up by
1760  * the LDC interrupt handler to process LDC packets and receive data.
1761  */
1762 void
1763 vgen_ldc_msg_worker(void *arg)
1764 {
1765 	callb_cpr_t	cprinfo;
1766 	vgen_ldc_t	*ldcp = (vgen_ldc_t *)arg;
1767 	vgen_t 		*vgenp = LDC_TO_VGEN(ldcp);
1768 	int		rv;
1769 
1770 	DBG1(vgenp, ldcp, "enter\n");
1771 	CALLB_CPR_INIT(&cprinfo, &ldcp->msg_thr_lock, callb_generic_cpr,
1772 	    "vnet_rcv_thread");
1773 	mutex_enter(&ldcp->msg_thr_lock);
1774 	while (!(ldcp->msg_thr_flags & VGEN_WTHR_STOP)) {
1775 
1776 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
1777 		/*
1778 		 * Wait until the data is received or a stop
1779 		 * request is received.
1780 		 */
1781 		while (!(ldcp->msg_thr_flags &
1782 		    (VGEN_WTHR_DATARCVD | VGEN_WTHR_STOP))) {
1783 			cv_wait(&ldcp->msg_thr_cv, &ldcp->msg_thr_lock);
1784 		}
1785 		CALLB_CPR_SAFE_END(&cprinfo, &ldcp->msg_thr_lock)
1786 
1787 		/*
1788 		 * First process the stop request.
1789 		 */
1790 		if (ldcp->msg_thr_flags & VGEN_WTHR_STOP) {
1791 			DBG2(vgenp, ldcp, "stopped\n");
1792 			break;
1793 		}
1794 		ldcp->msg_thr_flags &= ~VGEN_WTHR_DATARCVD;
1795 		ldcp->msg_thr_flags |= VGEN_WTHR_PROCESSING;
1796 		mutex_exit(&ldcp->msg_thr_lock);
1797 		DBG2(vgenp, ldcp, "calling vgen_handle_evt_read\n");
1798 		rv = vgen_handle_evt_read(ldcp, VGEN_MSG_THR);
1799 		mutex_enter(&ldcp->msg_thr_lock);
1800 		ldcp->msg_thr_flags &= ~VGEN_WTHR_PROCESSING;
1801 		if (rv != 0) {
1802 			/*
1803 			 * Channel has been reset. The thread should now exit.
1804 			 * The thread may be recreated if TxDring is negotiated
1805 			 * on this channel after the channel comes back up
1806 			 * again.
1807 			 */
1808 			ldcp->msg_thr_flags |= VGEN_WTHR_STOP;
1809 			break;
1810 		}
1811 	}
1812 
1813 	/*
1814 	 * Update the run status and wakeup the thread that
1815 	 * has sent the stop request.
1816 	 */
1817 	ldcp->msg_thr_flags &= ~VGEN_WTHR_STOP;
1818 	ldcp->msg_thread = NULL;
1819 	CALLB_CPR_EXIT(&cprinfo);
1820 
1821 	thread_exit();
1822 	DBG1(vgenp, ldcp, "exit\n");
1823 }
1824 
1825 /* vgen_stop_msg_thread -- Co-ordinate with receive thread to stop it */
1826 void
1827 vgen_stop_msg_thread(vgen_ldc_t *ldcp)
1828 {
1829 	kt_did_t	tid = 0;
1830 	vgen_t		*vgenp = LDC_TO_VGEN(ldcp);
1831 
1832 	DBG1(vgenp, ldcp, "enter\n");
1833 	/*
1834 	 * Send a stop request by setting the stop flag and
1835 	 * wait until the receive thread stops.
1836 	 */
1837 	mutex_enter(&ldcp->msg_thr_lock);
1838 	if (ldcp->msg_thread != NULL) {
1839 		tid = ldcp->msg_thread->t_did;
1840 		ldcp->msg_thr_flags |= VGEN_WTHR_STOP;
1841 		cv_signal(&ldcp->msg_thr_cv);
1842 	}
1843 	mutex_exit(&ldcp->msg_thr_lock);
1844 
1845 	if (tid != 0) {
1846 		thread_join(tid);
1847 	}
1848 	DBG1(vgenp, ldcp, "exit\n");
1849 }
1850