xref: /illumos-gate/usr/src/uts/intel/io/viona/viona_ring.c (revision 71f3ceb939e47627273608fb7ea4b3aa1c3b37e7)
1 /*
2  * Copyright (c) 2013  Chris Torek <torek @ torek net>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 /*
27  * This file and its contents are supplied under the terms of the
28  * Common Development and Distribution License ("CDDL"), version 1.0.
29  * You may only use this file in accordance with the terms of version
30  * 1.0 of the CDDL.
31  *
32  * A full copy of the text of the CDDL should have accompanied this
33  * source.  A copy of the CDDL is also available via the Internet at
34  * http://www.illumos.org/license/CDDL.
35  *
36  * Copyright 2015 Pluribus Networks Inc.
37  * Copyright 2019 Joyent, Inc.
38  * Copyright 2024 Oxide Computer Company
39  */
40 
41 
42 #include <sys/disp.h>
43 
44 #include "viona_impl.h"
45 
46 #define	VRING_MAX_LEN		32768
47 
48 /* Layout and sizing as defined in the spec for a legacy-style virtqueue */
49 
50 #define	LEGACY_VQ_ALIGN		PAGESIZE
51 
52 #define	LEGACY_DESC_SZ(qsz)	((qsz) * sizeof (struct virtio_desc))
53 /*
54  * Available ring consists of avail_idx (uint16_t), flags (uint16_t), qsz avail
55  * descriptors (uint16_t each), and (optional) used_event (uint16_t).
56  */
57 #define	LEGACY_AVAIL_SZ(qsz)	(((qsz) + 3) * sizeof (uint16_t))
58 /*
59  * Used ring consists of used_idx (uint16_t), flags (uint16_t), qsz used
60  * descriptors (two uint32_t each), and (optional) avail_event (uint16_t).
61  */
62 #define	LEGACY_USED_SZ(qsz)	\
63 	((qsz) * sizeof (struct virtio_used) + 3 * sizeof (uint16_t))
64 
65 #define	LEGACY_AVAIL_FLAGS_OFF(qsz)	LEGACY_DESC_SZ(qsz)
66 #define	LEGACY_AVAIL_IDX_OFF(qsz)	\
67 	(LEGACY_DESC_SZ(qsz) + sizeof (uint16_t))
68 #define	LEGACY_AVAIL_ENT_OFF(qsz, idx)	\
69 	(LEGACY_DESC_SZ(qsz) + (2 + (idx)) * sizeof (uint16_t))
70 
71 #define	LEGACY_USED_FLAGS_OFF(qsz)	\
72 	P2ROUNDUP(LEGACY_DESC_SZ(qsz) + LEGACY_AVAIL_SZ(qsz), LEGACY_VQ_ALIGN)
73 #define	LEGACY_USED_IDX_OFF(qsz)	\
74 	(LEGACY_USED_FLAGS_OFF(qsz) + sizeof (uint16_t))
75 #define	LEGACY_USED_ENT_OFF(qsz, idx)	\
76 	(LEGACY_USED_FLAGS_OFF(qsz) + 2 * sizeof (uint16_t) + \
77 	(idx) * sizeof (struct virtio_used))
78 
79 #define	LEGACY_VQ_SIZE(qsz)	\
80 	(LEGACY_USED_FLAGS_OFF(qsz) + \
81 	P2ROUNDUP(LEGACY_USED_SZ(qsz), LEGACY_VQ_ALIGN))
82 #define	LEGACY_VQ_PAGES(qsz)	(LEGACY_VQ_SIZE(qsz) / PAGESIZE)
83 
84 struct vq_held_region {
85 	struct iovec	*vhr_iov;
86 	vmm_page_t	*vhr_head;
87 	vmm_page_t	*vhr_tail;
88 	/* Length of iovec array supplied in `vhr_iov` */
89 	uint_t		vhr_niov;
90 	/*
91 	 * Index into vhr_iov, indicating the next "free" entry (following the
92 	 * last entry which has valid contents).
93 	 */
94 	uint_t		vhr_idx;
95 
96 	/* Total length of populated entries in `vhr_iov` */
97 	uint32_t	vhr_len;
98 };
99 typedef struct vq_held_region vq_held_region_t;
100 
101 static bool viona_ring_map(viona_vring_t *, bool);
102 static void viona_ring_unmap(viona_vring_t *);
103 static kthread_t *viona_create_worker(viona_vring_t *);
104 static void viona_ring_consolidate_stats(viona_vring_t *);
105 
106 static vmm_page_t *
107 vq_page_hold(viona_vring_t *ring, uint64_t gpa, bool writable)
108 {
109 	ASSERT3P(ring->vr_lease, !=, NULL);
110 
111 	int prot = PROT_READ;
112 	if (writable) {
113 		prot |= PROT_WRITE;
114 	}
115 
116 	return (vmm_drv_page_hold(ring->vr_lease, gpa, prot));
117 }
118 
119 /*
120  * Establish a hold on the page(s) which back the region of guest memory covered
121  * by [gpa, gpa + len).  The host-kernel-virtual pointers to those pages are
122  * stored in the iovec array supplied in `region`, along with the chain of
123  * vmm_page_t entries representing the held pages.  Since guest memory
124  * carries no guarantees of being physically contiguous (on the host), it is
125  * assumed that an iovec entry will be required for each PAGESIZE section
126  * covered by the specified `gpa` and `len` range.  For each iovec entry
127  * successfully populated by holding a page, `vhr_idx` will be incremented so it
128  * references the next available iovec entry (or `vhr_niov`, if the iovec array
129  * is full).  The responsibility for releasing the `vmm_page_t` chain (stored in
130  * `vhr_head` and `vhr_tail`) resides with the caller, regardless of the result.
131  */
132 static int
133 vq_region_hold(viona_vring_t *ring, uint64_t gpa, uint32_t len,
134     bool writable, vq_held_region_t *region)
135 {
136 	const uint32_t front_offset = gpa & PAGEOFFSET;
137 	const uint32_t front_len = MIN(len, PAGESIZE - front_offset);
138 	uint_t pages = 1;
139 	vmm_page_t *vmp;
140 	caddr_t buf;
141 
142 	ASSERT3U(region->vhr_idx, <, region->vhr_niov);
143 
144 	if (front_len < len) {
145 		pages += P2ROUNDUP((uint64_t)(len - front_len),
146 		    PAGESIZE) / PAGESIZE;
147 	}
148 	if (pages > (region->vhr_niov - region->vhr_idx)) {
149 		return (E2BIG);
150 	}
151 
152 	vmp = vq_page_hold(ring, gpa & PAGEMASK, writable);
153 	if (vmp == NULL) {
154 		return (EFAULT);
155 	}
156 	buf = (caddr_t)vmm_drv_page_readable(vmp);
157 
158 	region->vhr_iov[region->vhr_idx].iov_base = buf + front_offset;
159 	region->vhr_iov[region->vhr_idx].iov_len = front_len;
160 	region->vhr_idx++;
161 	gpa += front_len;
162 	len -= front_len;
163 	if (region->vhr_head == NULL) {
164 		region->vhr_head = vmp;
165 		region->vhr_tail = vmp;
166 	} else {
167 		vmm_drv_page_chain(region->vhr_tail, vmp);
168 		region->vhr_tail = vmp;
169 	}
170 
171 	for (uint_t i = 1; i < pages; i++) {
172 		ASSERT3U(gpa & PAGEOFFSET, ==, 0);
173 
174 		vmp = vq_page_hold(ring, gpa, writable);
175 		if (vmp == NULL) {
176 			return (EFAULT);
177 		}
178 		buf = (caddr_t)vmm_drv_page_readable(vmp);
179 
180 		const uint32_t chunk_len = MIN(len, PAGESIZE);
181 		region->vhr_iov[region->vhr_idx].iov_base = buf;
182 		region->vhr_iov[region->vhr_idx].iov_len = chunk_len;
183 		region->vhr_idx++;
184 		gpa += chunk_len;
185 		len -= chunk_len;
186 		vmm_drv_page_chain(region->vhr_tail, vmp);
187 		region->vhr_tail = vmp;
188 	}
189 
190 	return (0);
191 }
192 
193 static boolean_t
194 viona_ring_lease_expire_cb(void *arg)
195 {
196 	viona_vring_t *ring = arg;
197 
198 	mutex_enter(&ring->vr_lock);
199 	cv_broadcast(&ring->vr_cv);
200 	mutex_exit(&ring->vr_lock);
201 
202 	/* The lease will be broken asynchronously. */
203 	return (B_FALSE);
204 }
205 
206 static void
207 viona_ring_lease_drop(viona_vring_t *ring)
208 {
209 	ASSERT(MUTEX_HELD(&ring->vr_lock));
210 
211 	if (ring->vr_lease != NULL) {
212 		vmm_hold_t *hold = ring->vr_link->l_vm_hold;
213 
214 		ASSERT(hold != NULL);
215 
216 		/*
217 		 * Without an active lease, the ring mappings cannot be
218 		 * considered valid.
219 		 */
220 		viona_ring_unmap(ring);
221 
222 		vmm_drv_lease_break(hold, ring->vr_lease);
223 		ring->vr_lease = NULL;
224 	}
225 }
226 
227 boolean_t
228 viona_ring_lease_renew(viona_vring_t *ring)
229 {
230 	vmm_hold_t *hold = ring->vr_link->l_vm_hold;
231 
232 	ASSERT(hold != NULL);
233 	ASSERT(MUTEX_HELD(&ring->vr_lock));
234 
235 	viona_ring_lease_drop(ring);
236 
237 	/*
238 	 * Lease renewal will fail if the VM has requested that all holds be
239 	 * cleaned up.
240 	 */
241 	ring->vr_lease = vmm_drv_lease_sign(hold, viona_ring_lease_expire_cb,
242 	    ring);
243 	if (ring->vr_lease != NULL) {
244 		/* A ring undergoing renewal will need valid guest mappings */
245 		if (ring->vr_pa != 0 && ring->vr_size != 0) {
246 			/*
247 			 * If new mappings cannot be established, consider the
248 			 * lease renewal a failure.
249 			 */
250 			if (!viona_ring_map(ring, ring->vr_state == VRS_INIT)) {
251 				viona_ring_lease_drop(ring);
252 				return (B_FALSE);
253 			}
254 		}
255 	}
256 	return (ring->vr_lease != NULL);
257 }
258 
259 void
260 viona_ring_alloc(viona_link_t *link, viona_vring_t *ring)
261 {
262 	ring->vr_link = link;
263 	mutex_init(&ring->vr_lock, NULL, MUTEX_DRIVER, NULL);
264 	cv_init(&ring->vr_cv, NULL, CV_DRIVER, NULL);
265 	mutex_init(&ring->vr_a_mutex, NULL, MUTEX_DRIVER, NULL);
266 	mutex_init(&ring->vr_u_mutex, NULL, MUTEX_DRIVER, NULL);
267 }
268 
269 static void
270 viona_ring_misc_free(viona_vring_t *ring)
271 {
272 	const uint_t qsz = ring->vr_size;
273 
274 	viona_tx_ring_free(ring, qsz);
275 }
276 
277 void
278 viona_ring_free(viona_vring_t *ring)
279 {
280 	mutex_destroy(&ring->vr_lock);
281 	cv_destroy(&ring->vr_cv);
282 	mutex_destroy(&ring->vr_a_mutex);
283 	mutex_destroy(&ring->vr_u_mutex);
284 	ring->vr_link = NULL;
285 }
286 
287 int
288 viona_ring_init(viona_link_t *link, uint16_t idx,
289     const struct viona_ring_params *params)
290 {
291 	viona_vring_t *ring;
292 	kthread_t *t;
293 	int err = 0;
294 	const uint16_t qsz = params->vrp_size;
295 	const uint64_t pa = params->vrp_pa;
296 
297 	if (idx >= VIONA_VQ_MAX) {
298 		return (EINVAL);
299 	}
300 
301 	if (qsz == 0 || qsz > VRING_MAX_LEN || (1 << (ffs(qsz) - 1)) != qsz) {
302 		return (EINVAL);
303 	}
304 	if ((pa & (LEGACY_VQ_ALIGN - 1)) != 0) {
305 		return (EINVAL);
306 	}
307 
308 	ring = &link->l_vrings[idx];
309 	mutex_enter(&ring->vr_lock);
310 	if (ring->vr_state != VRS_RESET) {
311 		mutex_exit(&ring->vr_lock);
312 		return (EBUSY);
313 	}
314 	VERIFY(ring->vr_state_flags == 0);
315 
316 	ring->vr_lease = NULL;
317 	if (!viona_ring_lease_renew(ring)) {
318 		err = EBUSY;
319 		goto fail;
320 	}
321 
322 	ring->vr_size = qsz;
323 	ring->vr_mask = (ring->vr_size - 1);
324 	ring->vr_pa = pa;
325 	if (!viona_ring_map(ring, true)) {
326 		err = EINVAL;
327 		goto fail;
328 	}
329 
330 	/* Initialize queue indexes */
331 	ring->vr_cur_aidx = params->vrp_avail_idx;
332 	ring->vr_cur_uidx = params->vrp_used_idx;
333 
334 	if (idx == VIONA_VQ_TX) {
335 		viona_tx_ring_alloc(ring, qsz);
336 	}
337 
338 	/* Zero out MSI-X configuration */
339 	ring->vr_msi_addr = 0;
340 	ring->vr_msi_msg = 0;
341 
342 	/* Clear the stats */
343 	bzero(&ring->vr_stats, sizeof (ring->vr_stats));
344 	bzero(&ring->vr_err_stats, sizeof (ring->vr_err_stats));
345 
346 	t = viona_create_worker(ring);
347 	if (t == NULL) {
348 		err = ENOMEM;
349 		goto fail;
350 	}
351 	ring->vr_worker_thread = t;
352 	ring->vr_state = VRS_SETUP;
353 	cv_broadcast(&ring->vr_cv);
354 	mutex_exit(&ring->vr_lock);
355 	return (0);
356 
357 fail:
358 	viona_ring_lease_drop(ring);
359 	viona_ring_misc_free(ring);
360 	ring->vr_size = 0;
361 	ring->vr_mask = 0;
362 	ring->vr_pa = 0;
363 	ring->vr_cur_aidx = 0;
364 	ring->vr_cur_uidx = 0;
365 	mutex_exit(&ring->vr_lock);
366 	return (err);
367 }
368 
369 int
370 viona_ring_get_state(viona_link_t *link, uint16_t idx,
371     struct viona_ring_params *params)
372 {
373 	viona_vring_t *ring;
374 
375 	if (idx >= VIONA_VQ_MAX) {
376 		return (EINVAL);
377 	}
378 
379 	ring = &link->l_vrings[idx];
380 	mutex_enter(&ring->vr_lock);
381 
382 	params->vrp_size = ring->vr_size;
383 	params->vrp_pa = ring->vr_pa;
384 
385 	if (ring->vr_state == VRS_RUN) {
386 		/* On a running ring, we must heed the avail/used locks */
387 		mutex_enter(&ring->vr_a_mutex);
388 		params->vrp_avail_idx = ring->vr_cur_aidx;
389 		mutex_exit(&ring->vr_a_mutex);
390 		mutex_enter(&ring->vr_u_mutex);
391 		params->vrp_used_idx = ring->vr_cur_uidx;
392 		mutex_exit(&ring->vr_u_mutex);
393 	} else {
394 		/* Otherwise vr_lock is adequate protection */
395 		params->vrp_avail_idx = ring->vr_cur_aidx;
396 		params->vrp_used_idx = ring->vr_cur_uidx;
397 	}
398 
399 	mutex_exit(&ring->vr_lock);
400 
401 	return (0);
402 }
403 
404 int
405 viona_ring_reset(viona_vring_t *ring, boolean_t heed_signals)
406 {
407 	mutex_enter(&ring->vr_lock);
408 	if (ring->vr_state == VRS_RESET) {
409 		mutex_exit(&ring->vr_lock);
410 		return (0);
411 	}
412 
413 	if ((ring->vr_state_flags & VRSF_REQ_STOP) == 0) {
414 		ring->vr_state_flags |= VRSF_REQ_STOP;
415 		cv_broadcast(&ring->vr_cv);
416 	}
417 	while (ring->vr_state != VRS_RESET) {
418 		if (!heed_signals) {
419 			cv_wait(&ring->vr_cv, &ring->vr_lock);
420 		} else {
421 			int rs;
422 
423 			rs = cv_wait_sig(&ring->vr_cv, &ring->vr_lock);
424 			if (rs <= 0 && ring->vr_state != VRS_RESET) {
425 				mutex_exit(&ring->vr_lock);
426 				return (EINTR);
427 			}
428 		}
429 	}
430 	mutex_exit(&ring->vr_lock);
431 	return (0);
432 }
433 
434 static bool
435 viona_ring_map(viona_vring_t *ring, bool defer_dirty)
436 {
437 	const uint16_t qsz = ring->vr_size;
438 	uintptr_t pa = ring->vr_pa;
439 
440 	ASSERT3U(qsz, !=, 0);
441 	ASSERT3U(qsz, <=, VRING_MAX_LEN);
442 	ASSERT3U(pa, !=, 0);
443 	ASSERT3U(pa & (LEGACY_VQ_ALIGN - 1), ==, 0);
444 	ASSERT3U(LEGACY_VQ_ALIGN, ==, PAGESIZE);
445 	ASSERT(MUTEX_HELD(&ring->vr_lock));
446 	ASSERT3P(ring->vr_map_pages, ==, NULL);
447 
448 	const uint_t npages = LEGACY_VQ_PAGES(qsz);
449 	ring->vr_map_pages = kmem_zalloc(npages * sizeof (void *), KM_SLEEP);
450 
451 	int page_flags = 0;
452 	if (defer_dirty) {
453 		/*
454 		 * During initialization, and when entering the paused state,
455 		 * the page holds for a virtqueue are established with the
456 		 * DEFER_DIRTY flag set.
457 		 *
458 		 * This prevents those page holds from immediately marking the
459 		 * underlying pages as dirty, since the viona emulation is not
460 		 * yet performing any accesses.  Once the ring transitions to
461 		 * the VRS_RUN state, the held pages will be marked as dirty.
462 		 *
463 		 * Any ring mappings performed outside those state conditions,
464 		 * such as those part of vmm_lease renewal during steady-state
465 		 * operation, will map the ring pages normally (as considered
466 		 * immediately dirty).
467 		 */
468 		page_flags |= VMPF_DEFER_DIRTY;
469 	}
470 
471 	vmm_page_t *prev = NULL;
472 	for (uint_t i = 0; i < npages; i++, pa += PAGESIZE) {
473 		vmm_page_t *vmp;
474 
475 		vmp = vmm_drv_page_hold_ext(ring->vr_lease, pa,
476 		    PROT_READ | PROT_WRITE, page_flags);
477 		if (vmp == NULL) {
478 			viona_ring_unmap(ring);
479 			return (false);
480 		}
481 
482 		/*
483 		 * Keep the first page has the head of the chain, appending all
484 		 * subsequent pages to the tail.
485 		 */
486 		if (prev == NULL) {
487 			ring->vr_map_hold = vmp;
488 		} else {
489 			vmm_drv_page_chain(prev, vmp);
490 		}
491 		prev = vmp;
492 		ring->vr_map_pages[i] = vmm_drv_page_writable(vmp);
493 	}
494 
495 	return (true);
496 }
497 
498 static void
499 viona_ring_mark_dirty(viona_vring_t *ring)
500 {
501 	ASSERT(MUTEX_HELD(&ring->vr_lock));
502 	ASSERT(ring->vr_map_hold != NULL);
503 
504 	for (vmm_page_t *vp = ring->vr_map_hold; vp != NULL;
505 	    vp = vmm_drv_page_next(vp)) {
506 		vmm_drv_page_mark_dirty(vp);
507 	}
508 }
509 
510 static void
511 viona_ring_unmap(viona_vring_t *ring)
512 {
513 	ASSERT(MUTEX_HELD(&ring->vr_lock));
514 
515 	void **map = ring->vr_map_pages;
516 	if (map != NULL) {
517 		const uint_t npages = LEGACY_VQ_PAGES(ring->vr_size);
518 		kmem_free(map, npages * sizeof (void *));
519 		ring->vr_map_pages = NULL;
520 
521 		vmm_drv_page_release_chain(ring->vr_map_hold);
522 		ring->vr_map_hold = NULL;
523 	} else {
524 		ASSERT3P(ring->vr_map_hold, ==, NULL);
525 	}
526 }
527 
528 static inline void *
529 viona_ring_addr(viona_vring_t *ring, uint_t off)
530 {
531 	ASSERT3P(ring->vr_map_pages, !=, NULL);
532 	ASSERT3U(LEGACY_VQ_SIZE(ring->vr_size), >, off);
533 
534 	const uint_t page_num = off / PAGESIZE;
535 	const uint_t page_off = off % PAGESIZE;
536 	return ((caddr_t)ring->vr_map_pages[page_num] + page_off);
537 }
538 
539 void
540 viona_intr_ring(viona_vring_t *ring, boolean_t skip_flags_check)
541 {
542 	if (!skip_flags_check) {
543 		volatile uint16_t *avail_flags = viona_ring_addr(ring,
544 		    LEGACY_AVAIL_FLAGS_OFF(ring->vr_size));
545 
546 		if ((*avail_flags & VRING_AVAIL_F_NO_INTERRUPT) != 0) {
547 			return;
548 		}
549 	}
550 
551 	mutex_enter(&ring->vr_lock);
552 	uint64_t addr = ring->vr_msi_addr;
553 	uint64_t msg = ring->vr_msi_msg;
554 	mutex_exit(&ring->vr_lock);
555 	if (addr != 0) {
556 		/* Deliver the interrupt directly, if so configured... */
557 		(void) vmm_drv_msi(ring->vr_lease, addr, msg);
558 	} else {
559 		/* ... otherwise, leave it to userspace */
560 		if (atomic_cas_uint(&ring->vr_intr_enabled, 0, 1) == 0) {
561 			pollwakeup(&ring->vr_link->l_pollhead, POLLRDBAND);
562 		}
563 	}
564 }
565 
566 static inline bool
567 vring_stop_req(const viona_vring_t *ring)
568 {
569 	return ((ring->vr_state_flags & VRSF_REQ_STOP) != 0);
570 }
571 
572 static inline bool
573 vring_pause_req(const viona_vring_t *ring)
574 {
575 	return ((ring->vr_state_flags & VRSF_REQ_PAUSE) != 0);
576 }
577 
578 static inline bool
579 vring_start_req(const viona_vring_t *ring)
580 {
581 	return ((ring->vr_state_flags & VRSF_REQ_START) != 0);
582 }
583 
584 /*
585  * Check if vring worker thread should bail out.  This will heed indications
586  * that the containing process is exiting, as well as requests to stop or pause
587  * the ring.  The `stop_only` parameter controls if pause requests are ignored
588  * (true) or checked (false).
589  *
590  * Caller should hold vr_lock.
591  */
592 static bool
593 vring_need_bail_ext(const viona_vring_t *ring, bool stop_only)
594 {
595 	ASSERT(MUTEX_HELD(&ring->vr_lock));
596 
597 	if (vring_stop_req(ring) ||
598 	    (!stop_only && vring_pause_req(ring))) {
599 		return (true);
600 	}
601 
602 	kthread_t *t = ring->vr_worker_thread;
603 	if (t != NULL) {
604 		proc_t *p = ttoproc(t);
605 
606 		ASSERT(p != NULL);
607 		if ((p->p_flag & SEXITING) != 0) {
608 			return (true);
609 		}
610 	}
611 	return (false);
612 }
613 
614 bool
615 vring_need_bail(const viona_vring_t *ring)
616 {
617 	return (vring_need_bail_ext(ring, false));
618 }
619 
620 int
621 viona_ring_pause(viona_vring_t *ring)
622 {
623 	mutex_enter(&ring->vr_lock);
624 	switch (ring->vr_state) {
625 	case VRS_RESET:
626 	case VRS_SETUP:
627 	case VRS_INIT:
628 		/*
629 		 * For rings which have not yet started (even those in the
630 		 * VRS_SETUP and VRS_INIT phases, where there a running worker
631 		 * thread (waiting to be released to do its intended task), it
632 		 * is adequate to simply clear any start request, to keep them
633 		 * from proceeding into the actual work processing function.
634 		 */
635 		ring->vr_state_flags &= ~VRSF_REQ_START;
636 		mutex_exit(&ring->vr_lock);
637 		return (0);
638 
639 	case VRS_STOP:
640 		if ((ring->vr_state_flags & VRSF_REQ_STOP) != 0) {
641 			/* A ring on its way to RESET cannot be paused. */
642 			mutex_exit(&ring->vr_lock);
643 			return (EBUSY);
644 		}
645 		/* FALLTHROUGH */
646 	case VRS_RUN:
647 		ring->vr_state_flags |= VRSF_REQ_PAUSE;
648 		cv_broadcast(&ring->vr_cv);
649 		break;
650 
651 	default:
652 		panic("invalid ring state %d", ring->vr_state);
653 		break;
654 	}
655 
656 	for (;;) {
657 		int res = cv_wait_sig(&ring->vr_cv, &ring->vr_lock);
658 
659 		if (ring->vr_state == VRS_INIT ||
660 		    (ring->vr_state_flags & VRSF_REQ_PAUSE) == 0) {
661 			/* Ring made it to (or through) paused state */
662 			mutex_exit(&ring->vr_lock);
663 			return (0);
664 		}
665 		if (res == 0) {
666 			/* interrupted by signal */
667 			mutex_exit(&ring->vr_lock);
668 			return (EINTR);
669 		}
670 	}
671 	/* NOTREACHED */
672 }
673 
674 static void
675 viona_worker(void *arg)
676 {
677 	viona_vring_t *ring = (viona_vring_t *)arg;
678 	viona_link_t *link = ring->vr_link;
679 
680 	mutex_enter(&ring->vr_lock);
681 	VERIFY3U(ring->vr_state, ==, VRS_SETUP);
682 
683 	/* Bail immediately if ring shutdown or process exit was requested */
684 	if (vring_need_bail_ext(ring, true)) {
685 		goto ring_reset;
686 	}
687 
688 	/* Report worker thread as alive and notify creator */
689 ring_init:
690 	ring->vr_state = VRS_INIT;
691 	cv_broadcast(&ring->vr_cv);
692 
693 	while (!vring_start_req(ring)) {
694 		/*
695 		 * Keeping lease renewals timely while waiting for the ring to
696 		 * be started is important for avoiding deadlocks.
697 		 */
698 		if (vmm_drv_lease_expired(ring->vr_lease)) {
699 			if (!viona_ring_lease_renew(ring)) {
700 				goto ring_reset;
701 			}
702 		}
703 
704 		(void) cv_wait_sig(&ring->vr_cv, &ring->vr_lock);
705 
706 		if (vring_pause_req(ring)) {
707 			/* We are already paused in the INIT state. */
708 			ring->vr_state_flags &= ~VRSF_REQ_PAUSE;
709 		}
710 		if (vring_need_bail_ext(ring, true)) {
711 			goto ring_reset;
712 		}
713 	}
714 
715 	ASSERT((ring->vr_state_flags & VRSF_REQ_START) != 0);
716 	ring->vr_state = VRS_RUN;
717 	ring->vr_state_flags &= ~VRSF_REQ_START;
718 	viona_ring_mark_dirty(ring);
719 
720 	/* Ensure ring lease is valid first */
721 	if (vmm_drv_lease_expired(ring->vr_lease)) {
722 		if (!viona_ring_lease_renew(ring)) {
723 			goto ring_reset;
724 		}
725 	}
726 
727 	/* Process actual work */
728 	if (ring == &link->l_vrings[VIONA_VQ_RX]) {
729 		viona_worker_rx(ring, link);
730 	} else if (ring == &link->l_vrings[VIONA_VQ_TX]) {
731 		viona_worker_tx(ring, link);
732 	} else {
733 		panic("unexpected ring: %p", (void *)ring);
734 	}
735 
736 	VERIFY3U(ring->vr_state, ==, VRS_STOP);
737 	VERIFY3U(ring->vr_xfer_outstanding, ==, 0);
738 
739 	/*
740 	 * Consolidate stats data so that it is not lost if/when this ring is
741 	 * being stopped.
742 	 */
743 	viona_ring_consolidate_stats(ring);
744 
745 	/* Respond to a pause request if the ring is not required to stop */
746 	if (vring_pause_req(ring)) {
747 		ring->vr_state_flags &= ~VRSF_REQ_PAUSE;
748 
749 		if (vring_need_bail_ext(ring, true)) {
750 			goto ring_reset;
751 		}
752 
753 		/*
754 		 * To complete pausing of the ring, unmap and re-map the pages
755 		 * underpinning the virtqueue.  This is to synchronize their
756 		 * dirty state in the backing page tables and restore the
757 		 * defer-dirty state on the held pages.
758 		 */
759 		viona_ring_unmap(ring);
760 		if (viona_ring_map(ring, true)) {
761 			goto ring_init;
762 		}
763 
764 		/*
765 		 * If the ring pages failed to be mapped, fallthrough to
766 		 * ring-reset like any other failure.
767 		 */
768 	}
769 
770 ring_reset:
771 	viona_ring_misc_free(ring);
772 
773 	viona_ring_lease_drop(ring);
774 	ring->vr_cur_aidx = 0;
775 	ring->vr_size = 0;
776 	ring->vr_mask = 0;
777 	ring->vr_pa = 0;
778 	ring->vr_state = VRS_RESET;
779 	ring->vr_state_flags = 0;
780 	ring->vr_worker_thread = NULL;
781 	cv_broadcast(&ring->vr_cv);
782 	mutex_exit(&ring->vr_lock);
783 
784 	mutex_enter(&ttoproc(curthread)->p_lock);
785 	lwp_exit();
786 }
787 
788 static kthread_t *
789 viona_create_worker(viona_vring_t *ring)
790 {
791 	k_sigset_t hold_set;
792 	proc_t *p = curproc;
793 	kthread_t *t;
794 	klwp_t *lwp;
795 
796 	ASSERT(MUTEX_HELD(&ring->vr_lock));
797 	ASSERT(ring->vr_state == VRS_RESET);
798 
799 	sigfillset(&hold_set);
800 	lwp = lwp_create(viona_worker, (void *)ring, 0, p, TS_STOPPED,
801 	    minclsyspri - 1, &hold_set, curthread->t_cid, 0);
802 	if (lwp == NULL) {
803 		return (NULL);
804 	}
805 
806 	t = lwptot(lwp);
807 	mutex_enter(&p->p_lock);
808 	t->t_proc_flag = (t->t_proc_flag & ~TP_HOLDLWP) | TP_KTHREAD;
809 	lwp_create_done(t);
810 	mutex_exit(&p->p_lock);
811 
812 	return (t);
813 }
814 
815 void
816 vq_read_desc(viona_vring_t *ring, uint16_t idx, struct virtio_desc *descp)
817 {
818 	const uint_t entry_off = idx * sizeof (struct virtio_desc);
819 
820 	ASSERT3U(idx, <, ring->vr_size);
821 
822 	bcopy(viona_ring_addr(ring, entry_off), descp, sizeof (*descp));
823 }
824 
825 static uint16_t
826 vq_read_avail(viona_vring_t *ring, uint16_t idx)
827 {
828 	ASSERT3U(idx, <, ring->vr_size);
829 
830 	volatile uint16_t *avail_ent =
831 	    viona_ring_addr(ring, LEGACY_AVAIL_ENT_OFF(ring->vr_size, idx));
832 	return (*avail_ent);
833 }
834 
835 /*
836  * Given a buffer descriptor `desc`, attempt to map the pages backing that
837  * region of guest physical memory, taking into account that there are no
838  * guarantees about guest-contiguous pages being host-contiguous.
839  */
840 static int
841 vq_map_desc_bufs(viona_vring_t *ring, const struct virtio_desc *desc,
842     vq_held_region_t *region)
843 {
844 	if (desc->vd_len == 0) {
845 		VIONA_PROBE2(desc_bad_len, viona_vring_t *, ring,
846 		    uint32_t, desc->vd_len);
847 		VIONA_RING_STAT_INCR(ring, desc_bad_len);
848 		return (EINVAL);
849 	} else if ((region->vhr_len + desc->vd_len) < region->vhr_len) {
850 		VIONA_PROBE1(len_overflow, viona_vring_t *, ring);
851 		VIONA_RING_STAT_INCR(ring, len_overflow);
852 		return (EOVERFLOW);
853 	}
854 
855 	int err = vq_region_hold(ring, desc->vd_addr, desc->vd_len,
856 	    (desc->vd_flags & VRING_DESC_F_WRITE) != 0, region);
857 	if (err == 0) {
858 		region->vhr_len += desc->vd_len;
859 	} else if (err == E2BIG) {
860 		VIONA_PROBE1(too_many_desc, viona_vring_t *, ring);
861 		VIONA_RING_STAT_INCR(ring, too_many_desc);
862 	} else if (err == EFAULT) {
863 		VIONA_PROBE_BAD_RING_ADDR(ring, desc->vd_addr);
864 		VIONA_RING_STAT_INCR(ring, bad_ring_addr);
865 	}
866 
867 	return (err);
868 }
869 
870 /*
871  * Walk an indirect buffer descriptor `desc`, attempting to map the pages
872  * backing the regions of guest memory covered by its constituent descriptors.
873  */
874 static int
875 vq_map_indir_desc_bufs(viona_vring_t *ring, const struct virtio_desc *desc,
876     vq_held_region_t *region)
877 {
878 	const uint16_t indir_count = desc->vd_len / sizeof (struct virtio_desc);
879 
880 	if ((desc->vd_len & 0xf) != 0 || indir_count == 0 ||
881 	    indir_count > ring->vr_size ||
882 	    desc->vd_addr > (desc->vd_addr + desc->vd_len)) {
883 		VIONA_PROBE2(indir_bad_len, viona_vring_t *, ring,
884 		    uint32_t, desc->vd_len);
885 		VIONA_RING_STAT_INCR(ring, indir_bad_len);
886 		return (EINVAL);
887 	}
888 
889 	uint16_t indir_next = 0;
890 	const uint8_t *buf = NULL;
891 	uint64_t buf_gpa = UINT64_MAX;
892 	vmm_page_t *vmp = NULL;
893 	int err = 0;
894 
895 	for (;;) {
896 		uint64_t indir_gpa =
897 		    desc->vd_addr + (indir_next * sizeof (struct virtio_desc));
898 		uint64_t indir_page = indir_gpa & PAGEMASK;
899 		struct virtio_desc vp;
900 
901 		/*
902 		 * Get a mapping for the page that the next indirect descriptor
903 		 * resides in, if has not already been done.
904 		 */
905 		if (indir_page != buf_gpa) {
906 			if (vmp != NULL) {
907 				vmm_drv_page_release(vmp);
908 			}
909 			vmp = vq_page_hold(ring, indir_page, false);
910 			if (vmp == NULL) {
911 				VIONA_PROBE_BAD_RING_ADDR(ring, indir_page);
912 				VIONA_RING_STAT_INCR(ring, bad_ring_addr);
913 				err = EFAULT;
914 				break;
915 			}
916 			buf_gpa = indir_page;
917 			buf = vmm_drv_page_readable(vmp);
918 		}
919 
920 		/*
921 		 * A copy of the indirect descriptor is made here, rather than
922 		 * simply using a reference pointer.  This prevents malicious or
923 		 * erroneous guest writes to the descriptor from fooling the
924 		 * flags/bounds verification through a race.
925 		 */
926 		bcopy(buf + (indir_gpa - indir_page), &vp, sizeof (vp));
927 
928 		if (vp.vd_flags & VRING_DESC_F_INDIRECT) {
929 			VIONA_PROBE1(indir_bad_nest, viona_vring_t *, ring);
930 			VIONA_RING_STAT_INCR(ring, indir_bad_nest);
931 			err = EINVAL;
932 			break;
933 		} else if (vp.vd_len == 0) {
934 			VIONA_PROBE2(desc_bad_len, viona_vring_t *, ring,
935 			    uint32_t, vp.vd_len);
936 			VIONA_RING_STAT_INCR(ring, desc_bad_len);
937 			err = EINVAL;
938 			break;
939 		}
940 
941 		err = vq_map_desc_bufs(ring, &vp, region);
942 		if (err != 0) {
943 			break;
944 		}
945 
946 		/* Successfully reach the end of the indir chain */
947 		if ((vp.vd_flags & VRING_DESC_F_NEXT) == 0) {
948 			break;
949 		}
950 		if (region->vhr_idx >= region->vhr_niov) {
951 			VIONA_PROBE1(too_many_desc, viona_vring_t *, ring);
952 			VIONA_RING_STAT_INCR(ring, too_many_desc);
953 			err = E2BIG;
954 			break;
955 		}
956 
957 		indir_next = vp.vd_next;
958 		if (indir_next >= indir_count) {
959 			VIONA_PROBE3(indir_bad_next, viona_vring_t *, ring,
960 			    uint16_t, indir_next, uint16_t, indir_count);
961 			VIONA_RING_STAT_INCR(ring, indir_bad_next);
962 			err = EINVAL;
963 			break;
964 		}
965 	}
966 
967 	if (vmp != NULL) {
968 		vmm_drv_page_release(vmp);
969 	}
970 	return (err);
971 }
972 
973 int
974 vq_popchain(viona_vring_t *ring, struct iovec *iov, uint_t niov,
975     uint16_t *cookie, vmm_page_t **chain, uint32_t *len)
976 {
977 	uint16_t ndesc, idx, head, next;
978 	struct virtio_desc vdir;
979 	vq_held_region_t region = {
980 		.vhr_niov = niov,
981 		.vhr_iov = iov,
982 	};
983 
984 	ASSERT(iov != NULL);
985 	ASSERT(niov > 0 && niov < INT_MAX);
986 	ASSERT(*chain == NULL);
987 
988 	mutex_enter(&ring->vr_a_mutex);
989 	idx = ring->vr_cur_aidx;
990 	ndesc = viona_ring_num_avail(ring);
991 
992 	if (ndesc == 0) {
993 		mutex_exit(&ring->vr_a_mutex);
994 		return (0);
995 	}
996 	if (ndesc > ring->vr_size) {
997 		/*
998 		 * Despite the fact that the guest has provided an 'avail_idx'
999 		 * which indicates that an impossible number of descriptors are
1000 		 * available, continue on and attempt to process the next one.
1001 		 *
1002 		 * The transgression will not escape the probe or stats though.
1003 		 */
1004 		VIONA_PROBE2(ndesc_too_high, viona_vring_t *, ring,
1005 		    uint16_t, ndesc);
1006 		VIONA_RING_STAT_INCR(ring, ndesc_too_high);
1007 	}
1008 
1009 	head = vq_read_avail(ring, idx & ring->vr_mask);
1010 	next = head;
1011 
1012 	for (region.vhr_idx = 0; region.vhr_idx < niov; next = vdir.vd_next) {
1013 		if (next >= ring->vr_size) {
1014 			VIONA_PROBE2(bad_idx, viona_vring_t *, ring,
1015 			    uint16_t, next);
1016 			VIONA_RING_STAT_INCR(ring, bad_idx);
1017 			break;
1018 		}
1019 
1020 		vq_read_desc(ring, next, &vdir);
1021 		if ((vdir.vd_flags & VRING_DESC_F_INDIRECT) == 0) {
1022 			if (vq_map_desc_bufs(ring, &vdir, &region) != 0) {
1023 				break;
1024 			}
1025 		} else {
1026 			/*
1027 			 * Per the specification (Virtio 1.1 S2.6.5.3.1):
1028 			 *   A driver MUST NOT set both VIRTQ_DESC_F_INDIRECT
1029 			 *   and VIRTQ_DESC_F_NEXT in `flags`.
1030 			 */
1031 			if ((vdir.vd_flags & VRING_DESC_F_NEXT) != 0) {
1032 				VIONA_PROBE3(indir_bad_next,
1033 				    viona_vring_t *, ring,
1034 				    uint16_t, next, uint16_t, 0);
1035 				VIONA_RING_STAT_INCR(ring, indir_bad_next);
1036 				break;
1037 			}
1038 
1039 			if (vq_map_indir_desc_bufs(ring, &vdir, &region) != 0) {
1040 				break;
1041 			}
1042 		}
1043 
1044 		if ((vdir.vd_flags & VRING_DESC_F_NEXT) == 0) {
1045 			ring->vr_cur_aidx++;
1046 			mutex_exit(&ring->vr_a_mutex);
1047 
1048 			*cookie = head;
1049 			*chain = region.vhr_head;
1050 			if (len != NULL) {
1051 				*len = region.vhr_len;
1052 			}
1053 			return (region.vhr_idx);
1054 		}
1055 	}
1056 
1057 	mutex_exit(&ring->vr_a_mutex);
1058 	if (region.vhr_head != NULL) {
1059 		/*
1060 		 * If any pages were held prior to encountering an error, we
1061 		 * must release them now.
1062 		 */
1063 		vmm_drv_page_release_chain(region.vhr_head);
1064 	}
1065 	return (-1);
1066 }
1067 
1068 
1069 static void
1070 vq_write_used_ent(viona_vring_t *ring, uint16_t idx, uint16_t cookie,
1071     uint32_t len)
1072 {
1073 	/*
1074 	 * In a larger ring, entry could be split across pages, so be sure to
1075 	 * account for that when configuring the transfer by looking up the ID
1076 	 * and length addresses separately, rather than an address for a
1077 	 * combined `struct virtio_used`.
1078 	 */
1079 	const uint_t used_id_off = LEGACY_USED_ENT_OFF(ring->vr_size, idx);
1080 	const uint_t used_len_off = used_id_off + sizeof (uint32_t);
1081 	volatile uint32_t *idp = viona_ring_addr(ring, used_id_off);
1082 	volatile uint32_t *lenp = viona_ring_addr(ring, used_len_off);
1083 
1084 	ASSERT(MUTEX_HELD(&ring->vr_u_mutex));
1085 
1086 	*idp = cookie;
1087 	*lenp = len;
1088 }
1089 
1090 static void
1091 vq_write_used_idx(viona_vring_t *ring, uint16_t idx)
1092 {
1093 	ASSERT(MUTEX_HELD(&ring->vr_u_mutex));
1094 
1095 	volatile uint16_t *used_idx =
1096 	    viona_ring_addr(ring, LEGACY_USED_IDX_OFF(ring->vr_size));
1097 	*used_idx = idx;
1098 }
1099 
1100 void
1101 vq_pushchain(viona_vring_t *ring, uint32_t len, uint16_t cookie)
1102 {
1103 	uint16_t uidx;
1104 
1105 	mutex_enter(&ring->vr_u_mutex);
1106 
1107 	uidx = ring->vr_cur_uidx;
1108 	vq_write_used_ent(ring, uidx & ring->vr_mask, cookie, len);
1109 	uidx++;
1110 	membar_producer();
1111 
1112 	vq_write_used_idx(ring, uidx);
1113 	ring->vr_cur_uidx = uidx;
1114 
1115 	mutex_exit(&ring->vr_u_mutex);
1116 }
1117 
1118 void
1119 vq_pushchain_many(viona_vring_t *ring, uint_t num_bufs, used_elem_t *elem)
1120 {
1121 	uint16_t uidx;
1122 
1123 	mutex_enter(&ring->vr_u_mutex);
1124 
1125 	uidx = ring->vr_cur_uidx;
1126 
1127 	for (uint_t i = 0; i < num_bufs; i++, uidx++) {
1128 		vq_write_used_ent(ring, uidx & ring->vr_mask, elem[i].id,
1129 		    elem[i].len);
1130 	}
1131 
1132 	membar_producer();
1133 	vq_write_used_idx(ring, uidx);
1134 	ring->vr_cur_uidx = uidx;
1135 
1136 	mutex_exit(&ring->vr_u_mutex);
1137 }
1138 
1139 /*
1140  * Set USED_NO_NOTIFY on VQ so guest elides doorbell calls for new entries.
1141  */
1142 void
1143 viona_ring_disable_notify(viona_vring_t *ring)
1144 {
1145 	volatile uint16_t *used_flags =
1146 	    viona_ring_addr(ring, LEGACY_USED_FLAGS_OFF(ring->vr_size));
1147 
1148 	*used_flags |= VRING_USED_F_NO_NOTIFY;
1149 }
1150 
1151 /*
1152  * Clear USED_NO_NOTIFY on VQ so guest resumes doorbell calls for new entries.
1153  */
1154 void
1155 viona_ring_enable_notify(viona_vring_t *ring)
1156 {
1157 	volatile uint16_t *used_flags =
1158 	    viona_ring_addr(ring, LEGACY_USED_FLAGS_OFF(ring->vr_size));
1159 
1160 	*used_flags &= ~VRING_USED_F_NO_NOTIFY;
1161 }
1162 
1163 /*
1164  * Return the number of available descriptors in the vring taking care of the
1165  * 16-bit index wraparound.
1166  *
1167  * Note: If the number of apparently available descriptors is larger than the
1168  * ring size (due to guest misbehavior), this check will still report the
1169  * positive count of descriptors.
1170  */
1171 uint16_t
1172 viona_ring_num_avail(viona_vring_t *ring)
1173 {
1174 	volatile uint16_t *avail_idx =
1175 	    viona_ring_addr(ring, LEGACY_AVAIL_IDX_OFF(ring->vr_size));
1176 
1177 	return (*avail_idx - ring->vr_cur_aidx);
1178 }
1179 
1180 /* Record a successfully transferred packet for the ring stats */
1181 void
1182 viona_ring_stat_accept(viona_vring_t *ring, uint32_t len)
1183 {
1184 	atomic_inc_64(&ring->vr_stats.vts_packets);
1185 	atomic_add_64(&ring->vr_stats.vts_bytes, len);
1186 }
1187 
1188 /*
1189  * Record a dropped packet in the ring stats
1190  */
1191 void
1192 viona_ring_stat_drop(viona_vring_t *ring)
1193 {
1194 	atomic_inc_64(&ring->vr_stats.vts_drops);
1195 }
1196 
1197 /*
1198  * Record a packet transfer error in the ring stats
1199  */
1200 void
1201 viona_ring_stat_error(viona_vring_t *ring)
1202 {
1203 	atomic_inc_64(&ring->vr_stats.vts_errors);
1204 }
1205 
1206 /*
1207  * Consolidate statistic data for this ring into the totals for the link
1208  */
1209 static void
1210 viona_ring_consolidate_stats(viona_vring_t *ring)
1211 {
1212 	viona_link_t *link = ring->vr_link;
1213 	struct viona_transfer_stats *lstat =
1214 	    (ring == &link->l_vrings[VIONA_VQ_RX]) ?
1215 	    &link->l_stats.vls_rx : &link->l_stats.vls_tx;
1216 
1217 	mutex_enter(&link->l_stats_lock);
1218 	lstat->vts_packets += ring->vr_stats.vts_packets;
1219 	lstat->vts_bytes += ring->vr_stats.vts_bytes;
1220 	lstat->vts_drops += ring->vr_stats.vts_drops;
1221 	lstat->vts_errors += ring->vr_stats.vts_errors;
1222 	bzero(&ring->vr_stats, sizeof (ring->vr_stats));
1223 	mutex_exit(&link->l_stats_lock);
1224 }
1225 
1226 /*
1227  * Copy `sz` bytes from iovecs contained in `iob` to `dst.
1228  *
1229  * Returns `true` if copy was successful (implying adequate data was remaining
1230  * in the iov_bunch_t).
1231  */
1232 bool
1233 iov_bunch_copy(iov_bunch_t *iob, void *dst, uint32_t sz)
1234 {
1235 	if (sz > iob->ib_remain) {
1236 		return (false);
1237 	}
1238 	if (sz == 0) {
1239 		return (true);
1240 	}
1241 
1242 	caddr_t dest = dst;
1243 	do {
1244 		struct iovec *iov = iob->ib_iov;
1245 
1246 		ASSERT3U(iov->iov_len, <, UINT32_MAX);
1247 		ASSERT3U(iov->iov_len, !=, 0);
1248 
1249 		const uint32_t iov_avail = (iov->iov_len - iob->ib_offset);
1250 		const uint32_t to_copy = MIN(sz, iov_avail);
1251 
1252 		if (to_copy != 0) {
1253 			bcopy((caddr_t)iov->iov_base + iob->ib_offset, dest,
1254 			    to_copy);
1255 		}
1256 
1257 		sz -= to_copy;
1258 		iob->ib_remain -= to_copy;
1259 		dest += to_copy;
1260 		iob->ib_offset += to_copy;
1261 
1262 		ASSERT3U(iob->ib_offset, <=, iov->iov_len);
1263 
1264 		if (iob->ib_offset == iov->iov_len) {
1265 			iob->ib_iov++;
1266 			iob->ib_offset = 0;
1267 		}
1268 	} while (sz > 0);
1269 
1270 	return (true);
1271 }
1272 
1273 /*
1274  * Get the data pointer and length of the current head iovec, less any
1275  * offsetting from prior copy operations.  This will advanced the iov_bunch_t as
1276  * if the caller had performed a copy of that chunk length.
1277  *
1278  * Returns `true` if the iov_bunch_t had at least one iovec (unconsumed bytes)
1279  * remaining, setting `chunk` and `chunk_sz` to the chunk pointer and size,
1280  * respectively.
1281  */
1282 bool
1283 iov_bunch_next_chunk(iov_bunch_t *iob, caddr_t *chunk, uint32_t *chunk_sz)
1284 {
1285 	if (iob->ib_remain == 0) {
1286 		*chunk = NULL;
1287 		*chunk_sz = 0;
1288 		return (false);
1289 	}
1290 
1291 	*chunk_sz = iob->ib_iov->iov_len - iob->ib_offset;
1292 	*chunk = (caddr_t)iob->ib_iov->iov_base + iob->ib_offset;
1293 	iob->ib_remain -= *chunk_sz;
1294 	iob->ib_iov++;
1295 	iob->ib_offset = 0;
1296 	return (true);
1297 }
1298