xref: /titanic_50/usr/src/lib/mergeq/mergeq.c (revision f3e7f55e73a39377d55a030f124cc86b3b66a9cc)
1*f3e7f55eSRobert Mustacchi /*
2*f3e7f55eSRobert Mustacchi  * This file and its contents are supplied under the terms of the
3*f3e7f55eSRobert Mustacchi  * Common Development and Distribution License ("CDDL"), version 1.0.
4*f3e7f55eSRobert Mustacchi  * You may only use this file in accordance with the terms of version
5*f3e7f55eSRobert Mustacchi  * 1.0 of the CDDL.
6*f3e7f55eSRobert Mustacchi  *
7*f3e7f55eSRobert Mustacchi  * A full copy of the text of the CDDL should have accompanied this
8*f3e7f55eSRobert Mustacchi  * source.  A copy of the CDDL is also available via the Internet at
9*f3e7f55eSRobert Mustacchi  * http://www.illumos.org/license/CDDL.
10*f3e7f55eSRobert Mustacchi  */
11*f3e7f55eSRobert Mustacchi 
12*f3e7f55eSRobert Mustacchi /*
13*f3e7f55eSRobert Mustacchi  * Copyright 2015 Joyent, Inc.
14*f3e7f55eSRobert Mustacchi  */
15*f3e7f55eSRobert Mustacchi 
16*f3e7f55eSRobert Mustacchi /*
17*f3e7f55eSRobert Mustacchi  * Merge queue
18*f3e7f55eSRobert Mustacchi  *
19*f3e7f55eSRobert Mustacchi  * A multi-threaded merging queue.
20*f3e7f55eSRobert Mustacchi  *
21*f3e7f55eSRobert Mustacchi  * The general constraint of the merge queue is that if a set of items are
22*f3e7f55eSRobert Mustacchi  * inserted into the queue in the same order, then no matter how many threads
23*f3e7f55eSRobert Mustacchi  * are on the scene, we will always process the items in the same order. The
24*f3e7f55eSRobert Mustacchi  * secondary constraint is that to support environments that must be
25*f3e7f55eSRobert Mustacchi  * single-threaded, we explicitly *must not* create a thread in the case where
26*f3e7f55eSRobert Mustacchi  * the number of requested threads is just one.
27*f3e7f55eSRobert Mustacchi  *
28*f3e7f55eSRobert Mustacchi  * To that end, we've designed our queue as a circular buffer. We will grow that
29*f3e7f55eSRobert Mustacchi  * buffer to contain enough space for all the input items, after which we'll
30*f3e7f55eSRobert Mustacchi  * then treat it as a circular buffer.
31*f3e7f55eSRobert Mustacchi  *
32*f3e7f55eSRobert Mustacchi  * Items will be issued to a processing function two at a time, until there is
33*f3e7f55eSRobert Mustacchi  * only one item remaining in the queue, at which point we will be doing any
34*f3e7f55eSRobert Mustacchi  * merging work.
35*f3e7f55eSRobert Mustacchi  *
36*f3e7f55eSRobert Mustacchi  * A given queue has three different entries that we care about tracking:
37*f3e7f55eSRobert Mustacchi  *
38*f3e7f55eSRobert Mustacchi  * o mq_nproc   - What is the slot of the next item to process for something
39*f3e7f55eSRobert Mustacchi  *                looking for work.
40*f3e7f55eSRobert Mustacchi  *
41*f3e7f55eSRobert Mustacchi  * o mq_next    - What is the slot of the next item that should be inserted into
42*f3e7f55eSRobert Mustacchi  *                the queue.
43*f3e7f55eSRobert Mustacchi  *
44*f3e7f55eSRobert Mustacchi  * o mq_ncommit - What is the slot of the next item that should be committed.
45*f3e7f55eSRobert Mustacchi  *
46*f3e7f55eSRobert Mustacchi  * When a thread comes and looks for work, we pop entries off of the queue based
47*f3e7f55eSRobert Mustacchi  * on the index provided by mq_nproc. At the same time, it also gets the slot
48*f3e7f55eSRobert Mustacchi  * that it should place the result in, which is mq_next. However, because we
49*f3e7f55eSRobert Mustacchi  * have multiple threads that are operating on the system, we want to make sure
50*f3e7f55eSRobert Mustacchi  * that we push things onto the queue in order. We do that by allocating a slot
51*f3e7f55eSRobert Mustacchi  * to each task and when it completes, it waits for its slot to be ready based
52*f3e7f55eSRobert Mustacchi  * on it being the value of mq_ncommit.
53*f3e7f55eSRobert Mustacchi  *
54*f3e7f55eSRobert Mustacchi  * In addition, we keep track of the number of items in the queue as well as the
55*f3e7f55eSRobert Mustacchi  * number of active workers. There's also a generation count that is used to
56*f3e7f55eSRobert Mustacchi  * figure out when the various values might lap one another.
57*f3e7f55eSRobert Mustacchi  *
58*f3e7f55eSRobert Mustacchi  * The following images show what happens when we have a queue with six items
59*f3e7f55eSRobert Mustacchi  * and whose capacity has been shrunk to six, to better fit in the screen.
60*f3e7f55eSRobert Mustacchi  *
61*f3e7f55eSRobert Mustacchi  *
62*f3e7f55eSRobert Mustacchi  * 1) This is the initial configuration of the queue right before any processing
63*f3e7f55eSRobert Mustacchi  * is done in the context of mergeq_merge(). Every box has an initial item for
64*f3e7f55eSRobert Mustacchi  * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and
65*f3e7f55eSRobert Mustacchi  * mq_ncommit will all point at the initial entry. However, the mq_next has
66*f3e7f55eSRobert Mustacchi  * already lapped around the array and thus has a generation count of one.
67*f3e7f55eSRobert Mustacchi  *
68*f3e7f55eSRobert Mustacchi  * The '+' characters indicate which bucket the corresponding value of mq_nproc,
69*f3e7f55eSRobert Mustacchi  * mq_ncommit, and mq_nproc.
70*f3e7f55eSRobert Mustacchi  *
71*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
72*f3e7f55eSRobert Mustacchi  *                     | X || X || X || X || X || X |
73*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
74*f3e7f55eSRobert Mustacchi  *        mq_next (g1)   +
75*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)   +
76*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)   +
77*f3e7f55eSRobert Mustacchi  *
78*f3e7f55eSRobert Mustacchi  * 2) This shows the state right as the first thread begins to process an entry.
79*f3e7f55eSRobert Mustacchi  * Note in this example we will have two threads processing this queue. Note,
80*f3e7f55eSRobert Mustacchi  * mq_ncommit has not advanced. This is because the first thread has started
81*f3e7f55eSRobert Mustacchi  * processing entries, but it has not finished, and thus we can't commit it.
82*f3e7f55eSRobert Mustacchi  * We've incremented mq_next by one because it has gone ahead and assigned a
83*f3e7f55eSRobert Mustacchi  * single entry. We've incremented mq_nproc by two, because we have removed two
84*f3e7f55eSRobert Mustacchi  * entries and thus will have another set available.
85*f3e7f55eSRobert Mustacchi  *
86*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - slot 0
87*f3e7f55eSRobert Mustacchi  *                     |   ||   || X || X || X || X |     t2 - idle
88*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
89*f3e7f55eSRobert Mustacchi  *        mq_next (g1)        +
90*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)   +
91*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)             +
92*f3e7f55eSRobert Mustacchi  *
93*f3e7f55eSRobert Mustacchi  *
94*f3e7f55eSRobert Mustacchi  * 3) This shows the state right after the second thread begins to process an
95*f3e7f55eSRobert Mustacchi  * entry, note that the first thread has not finished. The changes are very
96*f3e7f55eSRobert Mustacchi  * similar to the previous state, we've advanced, mq_nproc and mq_next, but not
97*f3e7f55eSRobert Mustacchi  * mq_ncommit.
98*f3e7f55eSRobert Mustacchi  *
99*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - slot 0
100*f3e7f55eSRobert Mustacchi  *                     |   ||   ||   ||   || X || X |     t2 - slot 1
101*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
102*f3e7f55eSRobert Mustacchi  *        mq_next (g1)             +
103*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)   +
104*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)                       +
105*f3e7f55eSRobert Mustacchi  *
106*f3e7f55eSRobert Mustacchi  * 4) This shows the state after thread one has finished processing an item, but
107*f3e7f55eSRobert Mustacchi  * before it does anything else. Note that even if thread two finishes early, it
108*f3e7f55eSRobert Mustacchi  * cannot commit its item until thread one finishes. Here 'Y' refers to the
109*f3e7f55eSRobert Mustacchi  * result of merging the first two 'X's.
110*f3e7f55eSRobert Mustacchi  *
111*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - idle
112*f3e7f55eSRobert Mustacchi  *                     | Y ||   ||   ||   || X || X |     t2 - slot 1
113*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
114*f3e7f55eSRobert Mustacchi  *        mq_next (g1)             +
115*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)        +
116*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)                       +
117*f3e7f55eSRobert Mustacchi  *
118*f3e7f55eSRobert Mustacchi  * 5) This shows the state after thread one has begun to process the next round
119*f3e7f55eSRobert Mustacchi  * and after thread two has committed, but before it begins processing the next
120*f3e7f55eSRobert Mustacchi  * item. Note that mq_nproc has wrapped around and we've bumped its generation
121*f3e7f55eSRobert Mustacchi  * counter.
122*f3e7f55eSRobert Mustacchi  *
123*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - slot 2
124*f3e7f55eSRobert Mustacchi  *                     | Y || Y ||   ||   ||   ||   |     t2 - idle
125*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
126*f3e7f55eSRobert Mustacchi  *        mq_next (g1)                  +
127*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)             +
128*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)   +
129*f3e7f55eSRobert Mustacchi  *
130*f3e7f55eSRobert Mustacchi  * 6) Here, thread two, will take the next two Y values and thread 1 will commit
131*f3e7f55eSRobert Mustacchi  * its 'Y'. Thread one now must wait until thread two finishes such that it can
132*f3e7f55eSRobert Mustacchi  * do additional work.
133*f3e7f55eSRobert Mustacchi  *
134*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - waiting
135*f3e7f55eSRobert Mustacchi  *                     |   ||   || Y ||   ||   ||   |     t2 - slot 3
136*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
137*f3e7f55eSRobert Mustacchi  *        mq_next (g1)                       +
138*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)                  +
139*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)             +
140*f3e7f55eSRobert Mustacchi  *
141*f3e7f55eSRobert Mustacchi  * 7) Here, thread two has committed and thread one is about to go process the
142*f3e7f55eSRobert Mustacchi  * final entry. The character 'Z' represents the results of merging two 'Y's.
143*f3e7f55eSRobert Mustacchi  *
144*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - idle
145*f3e7f55eSRobert Mustacchi  *                     |   ||   || Y || Z ||   ||   |     t2 - idle
146*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
147*f3e7f55eSRobert Mustacchi  *        mq_next (g1)                       +
148*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)                       +
149*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)             +
150*f3e7f55eSRobert Mustacchi  *
151*f3e7f55eSRobert Mustacchi  * 8) Here, thread one is processing the final item. Thread two is waiting in
152*f3e7f55eSRobert Mustacchi  * mergeq_pop() for enough items to be available. In this case, it will never
153*f3e7f55eSRobert Mustacchi  * happen; however, once all threads have finished it will break out.
154*f3e7f55eSRobert Mustacchi  *
155*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - slot 4
156*f3e7f55eSRobert Mustacchi  *                     |   ||   ||   ||   ||   ||   |     t2 - idle
157*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
158*f3e7f55eSRobert Mustacchi  *        mq_next (g1)                            +
159*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)                       +
160*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)                       +
161*f3e7f55eSRobert Mustacchi  *
162*f3e7f55eSRobert Mustacchi  * 9) This is the final state of the queue, it has a single '*' item which is
163*f3e7f55eSRobert Mustacchi  * the final merge result. At this point, both thread one and thread two would
164*f3e7f55eSRobert Mustacchi  * stop processing and we'll return the result to the user.
165*f3e7f55eSRobert Mustacchi  *
166*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+     t1 - slot 4
167*f3e7f55eSRobert Mustacchi  *                     |   ||   ||   ||   || * ||   |     t2 - idle
168*f3e7f55eSRobert Mustacchi  *                     +---++---++---++---++---++---+
169*f3e7f55eSRobert Mustacchi  *        mq_next (g1)                            +
170*f3e7f55eSRobert Mustacchi  *     mq_ncommit (g0)                       +
171*f3e7f55eSRobert Mustacchi  *       mq_nproc (g0)                       +
172*f3e7f55eSRobert Mustacchi  *
173*f3e7f55eSRobert Mustacchi  *
174*f3e7f55eSRobert Mustacchi  * Note, that if at any point in time the processing function fails, then all
175*f3e7f55eSRobert Mustacchi  * the merges will quiesce and that error will be propagated back to the user.
176*f3e7f55eSRobert Mustacchi  */
177*f3e7f55eSRobert Mustacchi 
178*f3e7f55eSRobert Mustacchi #include <strings.h>
179*f3e7f55eSRobert Mustacchi #include <sys/debug.h>
180*f3e7f55eSRobert Mustacchi #include <thread.h>
181*f3e7f55eSRobert Mustacchi #include <synch.h>
182*f3e7f55eSRobert Mustacchi #include <errno.h>
183*f3e7f55eSRobert Mustacchi #include <limits.h>
184*f3e7f55eSRobert Mustacchi #include <stdlib.h>
185*f3e7f55eSRobert Mustacchi 
186*f3e7f55eSRobert Mustacchi #include "mergeq.h"
187*f3e7f55eSRobert Mustacchi 
188*f3e7f55eSRobert Mustacchi struct mergeq {
189*f3e7f55eSRobert Mustacchi 	mutex_t mq_lock;	/* Protects items below */
190*f3e7f55eSRobert Mustacchi 	cond_t	mq_cond;	/* Condition variable */
191*f3e7f55eSRobert Mustacchi 	void **mq_items;	/* Array of items to process */
192*f3e7f55eSRobert Mustacchi 	size_t mq_nitems;	/* Number of items in the queue */
193*f3e7f55eSRobert Mustacchi 	size_t mq_cap;		/* Capacity of the items */
194*f3e7f55eSRobert Mustacchi 	size_t mq_next;		/* Place to put next entry */
195*f3e7f55eSRobert Mustacchi 	size_t mq_gnext;	/* Generation for next */
196*f3e7f55eSRobert Mustacchi 	size_t mq_nproc;	/* Index of next thing to process */
197*f3e7f55eSRobert Mustacchi 	size_t mq_gnproc;	/* Generation for next proc */
198*f3e7f55eSRobert Mustacchi 	size_t mq_ncommit;	/* Index of the next thing to commit */
199*f3e7f55eSRobert Mustacchi 	size_t mq_gncommit;	/* Commit generation */
200*f3e7f55eSRobert Mustacchi 	uint_t mq_nactthrs;	/* Number of active threads */
201*f3e7f55eSRobert Mustacchi 	uint_t mq_ndthreads;	/* Desired number of threads */
202*f3e7f55eSRobert Mustacchi 	thread_t *mq_thrs;	/* Actual threads */
203*f3e7f55eSRobert Mustacchi 	mergeq_proc_f *mq_func;	/* Processing function */
204*f3e7f55eSRobert Mustacchi 	void *mq_arg;		/* Argument for processing */
205*f3e7f55eSRobert Mustacchi 	boolean_t mq_working;	/* Are we working on processing */
206*f3e7f55eSRobert Mustacchi 	boolean_t mq_iserror;	/* Have we encountered an error? */
207*f3e7f55eSRobert Mustacchi 	int mq_error;
208*f3e7f55eSRobert Mustacchi };
209*f3e7f55eSRobert Mustacchi 
210*f3e7f55eSRobert Mustacchi #define	MERGEQ_DEFAULT_CAP	64
211*f3e7f55eSRobert Mustacchi 
212*f3e7f55eSRobert Mustacchi static int
mergeq_error(int err)213*f3e7f55eSRobert Mustacchi mergeq_error(int err)
214*f3e7f55eSRobert Mustacchi {
215*f3e7f55eSRobert Mustacchi 	errno = err;
216*f3e7f55eSRobert Mustacchi 	return (MERGEQ_ERROR);
217*f3e7f55eSRobert Mustacchi }
218*f3e7f55eSRobert Mustacchi 
219*f3e7f55eSRobert Mustacchi void
mergeq_fini(mergeq_t * mqp)220*f3e7f55eSRobert Mustacchi mergeq_fini(mergeq_t *mqp)
221*f3e7f55eSRobert Mustacchi {
222*f3e7f55eSRobert Mustacchi 	if (mqp == NULL)
223*f3e7f55eSRobert Mustacchi 		return;
224*f3e7f55eSRobert Mustacchi 
225*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_working != B_TRUE);
226*f3e7f55eSRobert Mustacchi 
227*f3e7f55eSRobert Mustacchi 	if (mqp->mq_items != NULL)
228*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
229*f3e7f55eSRobert Mustacchi 	if (mqp->mq_ndthreads > 0) {
230*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp->mq_thrs, sizeof (thread_t) *
231*f3e7f55eSRobert Mustacchi 		    mqp->mq_ndthreads);
232*f3e7f55eSRobert Mustacchi 	}
233*f3e7f55eSRobert Mustacchi 	VERIFY0(cond_destroy(&mqp->mq_cond));
234*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_destroy(&mqp->mq_lock));
235*f3e7f55eSRobert Mustacchi 	mergeq_free(mqp, sizeof (mergeq_t));
236*f3e7f55eSRobert Mustacchi }
237*f3e7f55eSRobert Mustacchi 
238*f3e7f55eSRobert Mustacchi int
mergeq_init(mergeq_t ** outp,uint_t nthrs)239*f3e7f55eSRobert Mustacchi mergeq_init(mergeq_t **outp, uint_t nthrs)
240*f3e7f55eSRobert Mustacchi {
241*f3e7f55eSRobert Mustacchi 	int ret;
242*f3e7f55eSRobert Mustacchi 	mergeq_t *mqp;
243*f3e7f55eSRobert Mustacchi 
244*f3e7f55eSRobert Mustacchi 	mqp = mergeq_alloc(sizeof (mergeq_t));
245*f3e7f55eSRobert Mustacchi 	if (mqp == NULL)
246*f3e7f55eSRobert Mustacchi 		return (mergeq_error(ENOMEM));
247*f3e7f55eSRobert Mustacchi 
248*f3e7f55eSRobert Mustacchi 	bzero(mqp, sizeof (mergeq_t));
249*f3e7f55eSRobert Mustacchi 	mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP);
250*f3e7f55eSRobert Mustacchi 	if (mqp->mq_items == NULL) {
251*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp, sizeof (mergeq_t));
252*f3e7f55eSRobert Mustacchi 		return (mergeq_error(ENOMEM));
253*f3e7f55eSRobert Mustacchi 	}
254*f3e7f55eSRobert Mustacchi 	bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP);
255*f3e7f55eSRobert Mustacchi 
256*f3e7f55eSRobert Mustacchi 	mqp->mq_ndthreads = nthrs - 1;
257*f3e7f55eSRobert Mustacchi 	if (mqp->mq_ndthreads > 0) {
258*f3e7f55eSRobert Mustacchi 		mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) *
259*f3e7f55eSRobert Mustacchi 		    mqp->mq_ndthreads);
260*f3e7f55eSRobert Mustacchi 		if (mqp->mq_thrs == NULL) {
261*f3e7f55eSRobert Mustacchi 			mergeq_free(mqp->mq_items, sizeof (void *) *
262*f3e7f55eSRobert Mustacchi 			    MERGEQ_DEFAULT_CAP);
263*f3e7f55eSRobert Mustacchi 			mergeq_free(mqp, sizeof (mergeq_t));
264*f3e7f55eSRobert Mustacchi 			return (mergeq_error(ENOMEM));
265*f3e7f55eSRobert Mustacchi 		}
266*f3e7f55eSRobert Mustacchi 	}
267*f3e7f55eSRobert Mustacchi 
268*f3e7f55eSRobert Mustacchi 	if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
269*f3e7f55eSRobert Mustacchi 	    NULL)) != 0) {
270*f3e7f55eSRobert Mustacchi 		if (mqp->mq_ndthreads > 0) {
271*f3e7f55eSRobert Mustacchi 			mergeq_free(mqp->mq_thrs,
272*f3e7f55eSRobert Mustacchi 			    sizeof (thread_t) * mqp->mq_ndthreads);
273*f3e7f55eSRobert Mustacchi 		}
274*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp->mq_items, sizeof (void *) *
275*f3e7f55eSRobert Mustacchi 		    MERGEQ_DEFAULT_CAP);
276*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp, sizeof (mergeq_t));
277*f3e7f55eSRobert Mustacchi 		return (mergeq_error(ret));
278*f3e7f55eSRobert Mustacchi 	}
279*f3e7f55eSRobert Mustacchi 
280*f3e7f55eSRobert Mustacchi 	if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) {
281*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_destroy(&mqp->mq_lock));
282*f3e7f55eSRobert Mustacchi 		if (mqp->mq_ndthreads > 0) {
283*f3e7f55eSRobert Mustacchi 			mergeq_free(mqp->mq_thrs,
284*f3e7f55eSRobert Mustacchi 			    sizeof (thread_t) * mqp->mq_ndthreads);
285*f3e7f55eSRobert Mustacchi 		}
286*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp->mq_items, sizeof (void *) *
287*f3e7f55eSRobert Mustacchi 		    MERGEQ_DEFAULT_CAP);
288*f3e7f55eSRobert Mustacchi 		mergeq_free(mqp, sizeof (mergeq_t));
289*f3e7f55eSRobert Mustacchi 		return (mergeq_error(ret));
290*f3e7f55eSRobert Mustacchi 	}
291*f3e7f55eSRobert Mustacchi 
292*f3e7f55eSRobert Mustacchi 	mqp->mq_cap = MERGEQ_DEFAULT_CAP;
293*f3e7f55eSRobert Mustacchi 	*outp = mqp;
294*f3e7f55eSRobert Mustacchi 	return (0);
295*f3e7f55eSRobert Mustacchi }
296*f3e7f55eSRobert Mustacchi 
297*f3e7f55eSRobert Mustacchi static void
mergeq_reset(mergeq_t * mqp)298*f3e7f55eSRobert Mustacchi mergeq_reset(mergeq_t *mqp)
299*f3e7f55eSRobert Mustacchi {
300*f3e7f55eSRobert Mustacchi 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
301*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_working == B_FALSE);
302*f3e7f55eSRobert Mustacchi 	if (mqp->mq_cap != 0)
303*f3e7f55eSRobert Mustacchi 		bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
304*f3e7f55eSRobert Mustacchi 	mqp->mq_nitems = 0;
305*f3e7f55eSRobert Mustacchi 	mqp->mq_next = 0;
306*f3e7f55eSRobert Mustacchi 	mqp->mq_gnext = 0;
307*f3e7f55eSRobert Mustacchi 	mqp->mq_nproc = 0;
308*f3e7f55eSRobert Mustacchi 	mqp->mq_gnproc = 0;
309*f3e7f55eSRobert Mustacchi 	mqp->mq_ncommit = 0;
310*f3e7f55eSRobert Mustacchi 	mqp->mq_gncommit = 0;
311*f3e7f55eSRobert Mustacchi 	mqp->mq_func = NULL;
312*f3e7f55eSRobert Mustacchi 	mqp->mq_arg = NULL;
313*f3e7f55eSRobert Mustacchi 	mqp->mq_iserror = B_FALSE;
314*f3e7f55eSRobert Mustacchi 	mqp->mq_error = 0;
315*f3e7f55eSRobert Mustacchi }
316*f3e7f55eSRobert Mustacchi 
317*f3e7f55eSRobert Mustacchi static int
mergeq_grow(mergeq_t * mqp)318*f3e7f55eSRobert Mustacchi mergeq_grow(mergeq_t *mqp)
319*f3e7f55eSRobert Mustacchi {
320*f3e7f55eSRobert Mustacchi 	size_t ncap;
321*f3e7f55eSRobert Mustacchi 	void **items;
322*f3e7f55eSRobert Mustacchi 
323*f3e7f55eSRobert Mustacchi 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
324*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_working == B_FALSE);
325*f3e7f55eSRobert Mustacchi 
326*f3e7f55eSRobert Mustacchi 	if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP)
327*f3e7f55eSRobert Mustacchi 		return (ENOSPC);
328*f3e7f55eSRobert Mustacchi 
329*f3e7f55eSRobert Mustacchi 	ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP;
330*f3e7f55eSRobert Mustacchi 	items = mergeq_alloc(ncap * sizeof (void *));
331*f3e7f55eSRobert Mustacchi 	if (items == NULL)
332*f3e7f55eSRobert Mustacchi 		return (ENOMEM);
333*f3e7f55eSRobert Mustacchi 
334*f3e7f55eSRobert Mustacchi 	bzero(items, ncap * sizeof (void *));
335*f3e7f55eSRobert Mustacchi 	bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *));
336*f3e7f55eSRobert Mustacchi 	mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *));
337*f3e7f55eSRobert Mustacchi 	mqp->mq_items = items;
338*f3e7f55eSRobert Mustacchi 	mqp->mq_cap = ncap;
339*f3e7f55eSRobert Mustacchi 	return (0);
340*f3e7f55eSRobert Mustacchi }
341*f3e7f55eSRobert Mustacchi 
342*f3e7f55eSRobert Mustacchi int
mergeq_add(mergeq_t * mqp,void * item)343*f3e7f55eSRobert Mustacchi mergeq_add(mergeq_t *mqp, void *item)
344*f3e7f55eSRobert Mustacchi {
345*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_lock(&mqp->mq_lock));
346*f3e7f55eSRobert Mustacchi 	if (mqp->mq_working == B_TRUE) {
347*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_unlock(&mqp->mq_lock));
348*f3e7f55eSRobert Mustacchi 		return (mergeq_error(ENXIO));
349*f3e7f55eSRobert Mustacchi 	}
350*f3e7f55eSRobert Mustacchi 
351*f3e7f55eSRobert Mustacchi 	if (mqp->mq_next == mqp->mq_cap) {
352*f3e7f55eSRobert Mustacchi 		int ret;
353*f3e7f55eSRobert Mustacchi 
354*f3e7f55eSRobert Mustacchi 		if ((ret = mergeq_grow(mqp)) != 0) {
355*f3e7f55eSRobert Mustacchi 			VERIFY0(mutex_unlock(&mqp->mq_lock));
356*f3e7f55eSRobert Mustacchi 			return (mergeq_error(ret));
357*f3e7f55eSRobert Mustacchi 		}
358*f3e7f55eSRobert Mustacchi 	}
359*f3e7f55eSRobert Mustacchi 	mqp->mq_items[mqp->mq_next] = item;
360*f3e7f55eSRobert Mustacchi 	mqp->mq_next++;
361*f3e7f55eSRobert Mustacchi 	mqp->mq_nitems++;
362*f3e7f55eSRobert Mustacchi 
363*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_unlock(&mqp->mq_lock));
364*f3e7f55eSRobert Mustacchi 	return (0);
365*f3e7f55eSRobert Mustacchi }
366*f3e7f55eSRobert Mustacchi 
367*f3e7f55eSRobert Mustacchi static size_t
mergeq_slot(mergeq_t * mqp)368*f3e7f55eSRobert Mustacchi mergeq_slot(mergeq_t *mqp)
369*f3e7f55eSRobert Mustacchi {
370*f3e7f55eSRobert Mustacchi 	size_t s;
371*f3e7f55eSRobert Mustacchi 
372*f3e7f55eSRobert Mustacchi 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
373*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_next < mqp->mq_cap);
374*f3e7f55eSRobert Mustacchi 
375*f3e7f55eSRobert Mustacchi 	/*
376*f3e7f55eSRobert Mustacchi 	 * This probably should be a cv / wait thing.
377*f3e7f55eSRobert Mustacchi 	 */
378*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap);
379*f3e7f55eSRobert Mustacchi 
380*f3e7f55eSRobert Mustacchi 	s = mqp->mq_next;
381*f3e7f55eSRobert Mustacchi 	mqp->mq_next++;
382*f3e7f55eSRobert Mustacchi 	if (mqp->mq_next == mqp->mq_cap) {
383*f3e7f55eSRobert Mustacchi 		mqp->mq_next %= mqp->mq_cap;
384*f3e7f55eSRobert Mustacchi 		mqp->mq_gnext++;
385*f3e7f55eSRobert Mustacchi 	}
386*f3e7f55eSRobert Mustacchi 
387*f3e7f55eSRobert Mustacchi 	return (s);
388*f3e7f55eSRobert Mustacchi }
389*f3e7f55eSRobert Mustacchi 
390*f3e7f55eSRobert Mustacchi /*
391*f3e7f55eSRobert Mustacchi  * Internal function to push items onto the queue which is now a circular
392*f3e7f55eSRobert Mustacchi  * buffer. This should only be used once we begin working on the queue.
393*f3e7f55eSRobert Mustacchi  */
394*f3e7f55eSRobert Mustacchi static void
mergeq_push(mergeq_t * mqp,size_t slot,void * item)395*f3e7f55eSRobert Mustacchi mergeq_push(mergeq_t *mqp, size_t slot, void *item)
396*f3e7f55eSRobert Mustacchi {
397*f3e7f55eSRobert Mustacchi 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
398*f3e7f55eSRobert Mustacchi 	VERIFY(slot < mqp->mq_cap);
399*f3e7f55eSRobert Mustacchi 
400*f3e7f55eSRobert Mustacchi 	/*
401*f3e7f55eSRobert Mustacchi 	 * We need to verify that we don't push over something that exists.
402*f3e7f55eSRobert Mustacchi 	 * Based on the design, this should never happen. However, in the face
403*f3e7f55eSRobert Mustacchi 	 * of bugs, anything is possible.
404*f3e7f55eSRobert Mustacchi 	 */
405*f3e7f55eSRobert Mustacchi 	while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE)
406*f3e7f55eSRobert Mustacchi 		(void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
407*f3e7f55eSRobert Mustacchi 
408*f3e7f55eSRobert Mustacchi 	if (mqp->mq_iserror == B_TRUE)
409*f3e7f55eSRobert Mustacchi 		return;
410*f3e7f55eSRobert Mustacchi 
411*f3e7f55eSRobert Mustacchi 	mqp->mq_items[slot] = item;
412*f3e7f55eSRobert Mustacchi 	mqp->mq_nitems++;
413*f3e7f55eSRobert Mustacchi 	mqp->mq_ncommit++;
414*f3e7f55eSRobert Mustacchi 	if (mqp->mq_ncommit == mqp->mq_cap) {
415*f3e7f55eSRobert Mustacchi 		mqp->mq_ncommit %= mqp->mq_cap;
416*f3e7f55eSRobert Mustacchi 		mqp->mq_gncommit++;
417*f3e7f55eSRobert Mustacchi 	}
418*f3e7f55eSRobert Mustacchi 	(void) cond_broadcast(&mqp->mq_cond);
419*f3e7f55eSRobert Mustacchi }
420*f3e7f55eSRobert Mustacchi 
421*f3e7f55eSRobert Mustacchi static void *
mergeq_pop_one(mergeq_t * mqp)422*f3e7f55eSRobert Mustacchi mergeq_pop_one(mergeq_t *mqp)
423*f3e7f55eSRobert Mustacchi {
424*f3e7f55eSRobert Mustacchi 	void *out;
425*f3e7f55eSRobert Mustacchi 
426*f3e7f55eSRobert Mustacchi 	/*
427*f3e7f55eSRobert Mustacchi 	 * We can't move mq_nproc beyond mq_next if they're on the same
428*f3e7f55eSRobert Mustacchi 	 * generation.
429*f3e7f55eSRobert Mustacchi 	 */
430*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_gnext != mqp->mq_gnproc ||
431*f3e7f55eSRobert Mustacchi 	    mqp->mq_nproc != mqp->mq_next);
432*f3e7f55eSRobert Mustacchi 
433*f3e7f55eSRobert Mustacchi 	out = mqp->mq_items[mqp->mq_nproc];
434*f3e7f55eSRobert Mustacchi 
435*f3e7f55eSRobert Mustacchi 	mqp->mq_items[mqp->mq_nproc] = NULL;
436*f3e7f55eSRobert Mustacchi 	mqp->mq_nproc++;
437*f3e7f55eSRobert Mustacchi 	if (mqp->mq_nproc == mqp->mq_cap) {
438*f3e7f55eSRobert Mustacchi 		mqp->mq_nproc %= mqp->mq_cap;
439*f3e7f55eSRobert Mustacchi 		mqp->mq_gnproc++;
440*f3e7f55eSRobert Mustacchi 	}
441*f3e7f55eSRobert Mustacchi 	mqp->mq_nitems--;
442*f3e7f55eSRobert Mustacchi 
443*f3e7f55eSRobert Mustacchi 	return (out);
444*f3e7f55eSRobert Mustacchi }
445*f3e7f55eSRobert Mustacchi 
446*f3e7f55eSRobert Mustacchi /*
447*f3e7f55eSRobert Mustacchi  * Pop a set of two entries from the queue. We may not have anything to process
448*f3e7f55eSRobert Mustacchi  * at the moment, eg. be waiting for someone to add something. In which case,
449*f3e7f55eSRobert Mustacchi  * we'll be sitting and waiting.
450*f3e7f55eSRobert Mustacchi  */
451*f3e7f55eSRobert Mustacchi static boolean_t
mergeq_pop(mergeq_t * mqp,void ** first,void ** second)452*f3e7f55eSRobert Mustacchi mergeq_pop(mergeq_t *mqp, void **first, void **second)
453*f3e7f55eSRobert Mustacchi {
454*f3e7f55eSRobert Mustacchi 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
455*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_nproc < mqp->mq_cap);
456*f3e7f55eSRobert Mustacchi 
457*f3e7f55eSRobert Mustacchi 	while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 &&
458*f3e7f55eSRobert Mustacchi 	    mqp->mq_iserror == B_FALSE)
459*f3e7f55eSRobert Mustacchi 		(void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
460*f3e7f55eSRobert Mustacchi 
461*f3e7f55eSRobert Mustacchi 	if (mqp->mq_iserror == B_TRUE)
462*f3e7f55eSRobert Mustacchi 		return (B_FALSE);
463*f3e7f55eSRobert Mustacchi 
464*f3e7f55eSRobert Mustacchi 	if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) {
465*f3e7f55eSRobert Mustacchi 		VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1);
466*f3e7f55eSRobert Mustacchi 		return (B_FALSE);
467*f3e7f55eSRobert Mustacchi 	}
468*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_nitems >= 2);
469*f3e7f55eSRobert Mustacchi 
470*f3e7f55eSRobert Mustacchi 	*first = mergeq_pop_one(mqp);
471*f3e7f55eSRobert Mustacchi 	*second = mergeq_pop_one(mqp);
472*f3e7f55eSRobert Mustacchi 
473*f3e7f55eSRobert Mustacchi 	return (B_TRUE);
474*f3e7f55eSRobert Mustacchi }
475*f3e7f55eSRobert Mustacchi 
476*f3e7f55eSRobert Mustacchi static void *
mergeq_thr_merge(void * arg)477*f3e7f55eSRobert Mustacchi mergeq_thr_merge(void *arg)
478*f3e7f55eSRobert Mustacchi {
479*f3e7f55eSRobert Mustacchi 	mergeq_t *mqp = arg;
480*f3e7f55eSRobert Mustacchi 
481*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_lock(&mqp->mq_lock));
482*f3e7f55eSRobert Mustacchi 
483*f3e7f55eSRobert Mustacchi 	/*
484*f3e7f55eSRobert Mustacchi 	 * Check to make sure creation worked and if not, fail fast.
485*f3e7f55eSRobert Mustacchi 	 */
486*f3e7f55eSRobert Mustacchi 	if (mqp->mq_iserror == B_TRUE) {
487*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_unlock(&mqp->mq_lock));
488*f3e7f55eSRobert Mustacchi 		return (NULL);
489*f3e7f55eSRobert Mustacchi 	}
490*f3e7f55eSRobert Mustacchi 
491*f3e7f55eSRobert Mustacchi 	for (;;) {
492*f3e7f55eSRobert Mustacchi 		void *first, *second, *out;
493*f3e7f55eSRobert Mustacchi 		int ret;
494*f3e7f55eSRobert Mustacchi 		size_t slot;
495*f3e7f55eSRobert Mustacchi 
496*f3e7f55eSRobert Mustacchi 		if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) {
497*f3e7f55eSRobert Mustacchi 			VERIFY0(mutex_unlock(&mqp->mq_lock));
498*f3e7f55eSRobert Mustacchi 			return (NULL);
499*f3e7f55eSRobert Mustacchi 		}
500*f3e7f55eSRobert Mustacchi 
501*f3e7f55eSRobert Mustacchi 		if (mergeq_pop(mqp, &first, &second) == B_FALSE) {
502*f3e7f55eSRobert Mustacchi 			VERIFY0(mutex_unlock(&mqp->mq_lock));
503*f3e7f55eSRobert Mustacchi 			return (NULL);
504*f3e7f55eSRobert Mustacchi 		}
505*f3e7f55eSRobert Mustacchi 		slot = mergeq_slot(mqp);
506*f3e7f55eSRobert Mustacchi 
507*f3e7f55eSRobert Mustacchi 		mqp->mq_nactthrs++;
508*f3e7f55eSRobert Mustacchi 
509*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_unlock(&mqp->mq_lock));
510*f3e7f55eSRobert Mustacchi 		ret = mqp->mq_func(first, second, &out, mqp->mq_arg);
511*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_lock(&mqp->mq_lock));
512*f3e7f55eSRobert Mustacchi 
513*f3e7f55eSRobert Mustacchi 		if (ret != 0) {
514*f3e7f55eSRobert Mustacchi 			if (mqp->mq_iserror == B_FALSE) {
515*f3e7f55eSRobert Mustacchi 				mqp->mq_iserror = B_TRUE;
516*f3e7f55eSRobert Mustacchi 				mqp->mq_error = ret;
517*f3e7f55eSRobert Mustacchi 				(void) cond_broadcast(&mqp->mq_cond);
518*f3e7f55eSRobert Mustacchi 			}
519*f3e7f55eSRobert Mustacchi 			mqp->mq_nactthrs--;
520*f3e7f55eSRobert Mustacchi 			VERIFY0(mutex_unlock(&mqp->mq_lock));
521*f3e7f55eSRobert Mustacchi 			return (NULL);
522*f3e7f55eSRobert Mustacchi 		}
523*f3e7f55eSRobert Mustacchi 		mergeq_push(mqp, slot, out);
524*f3e7f55eSRobert Mustacchi 		mqp->mq_nactthrs--;
525*f3e7f55eSRobert Mustacchi 	}
526*f3e7f55eSRobert Mustacchi }
527*f3e7f55eSRobert Mustacchi 
528*f3e7f55eSRobert Mustacchi int
mergeq_merge(mergeq_t * mqp,mergeq_proc_f * func,void * arg,void ** outp,int * errp)529*f3e7f55eSRobert Mustacchi mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp,
530*f3e7f55eSRobert Mustacchi     int *errp)
531*f3e7f55eSRobert Mustacchi {
532*f3e7f55eSRobert Mustacchi 	int ret, i;
533*f3e7f55eSRobert Mustacchi 	boolean_t seterr = B_FALSE;
534*f3e7f55eSRobert Mustacchi 
535*f3e7f55eSRobert Mustacchi 	if (mqp == NULL || func == NULL || outp == NULL) {
536*f3e7f55eSRobert Mustacchi 		return (mergeq_error(EINVAL));
537*f3e7f55eSRobert Mustacchi 	}
538*f3e7f55eSRobert Mustacchi 
539*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_lock(&mqp->mq_lock));
540*f3e7f55eSRobert Mustacchi 	if (mqp->mq_working == B_TRUE) {
541*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_unlock(&mqp->mq_lock));
542*f3e7f55eSRobert Mustacchi 		return (mergeq_error(EBUSY));
543*f3e7f55eSRobert Mustacchi 	}
544*f3e7f55eSRobert Mustacchi 
545*f3e7f55eSRobert Mustacchi 	if (mqp->mq_nitems == 0) {
546*f3e7f55eSRobert Mustacchi 		*outp = NULL;
547*f3e7f55eSRobert Mustacchi 		mergeq_reset(mqp);
548*f3e7f55eSRobert Mustacchi 		VERIFY0(mutex_unlock(&mqp->mq_lock));
549*f3e7f55eSRobert Mustacchi 		return (0);
550*f3e7f55eSRobert Mustacchi 	}
551*f3e7f55eSRobert Mustacchi 
552*f3e7f55eSRobert Mustacchi 	/*
553*f3e7f55eSRobert Mustacchi 	 * Now that we've finished adding items to the queue, turn it into a
554*f3e7f55eSRobert Mustacchi 	 * circular buffer.
555*f3e7f55eSRobert Mustacchi 	 */
556*f3e7f55eSRobert Mustacchi 	mqp->mq_func = func;
557*f3e7f55eSRobert Mustacchi 	mqp->mq_arg = arg;
558*f3e7f55eSRobert Mustacchi 	mqp->mq_nproc = 0;
559*f3e7f55eSRobert Mustacchi 	mqp->mq_working = B_TRUE;
560*f3e7f55eSRobert Mustacchi 	if (mqp->mq_next == mqp->mq_cap) {
561*f3e7f55eSRobert Mustacchi 		mqp->mq_next %= mqp->mq_cap;
562*f3e7f55eSRobert Mustacchi 		mqp->mq_gnext++;
563*f3e7f55eSRobert Mustacchi 	}
564*f3e7f55eSRobert Mustacchi 	mqp->mq_ncommit = mqp->mq_next;
565*f3e7f55eSRobert Mustacchi 
566*f3e7f55eSRobert Mustacchi 	ret = 0;
567*f3e7f55eSRobert Mustacchi 	for (i = 0; i < mqp->mq_ndthreads; i++) {
568*f3e7f55eSRobert Mustacchi 		ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0,
569*f3e7f55eSRobert Mustacchi 		    &mqp->mq_thrs[i]);
570*f3e7f55eSRobert Mustacchi 		if (ret != 0) {
571*f3e7f55eSRobert Mustacchi 			mqp->mq_iserror = B_TRUE;
572*f3e7f55eSRobert Mustacchi 			break;
573*f3e7f55eSRobert Mustacchi 		}
574*f3e7f55eSRobert Mustacchi 	}
575*f3e7f55eSRobert Mustacchi 
576*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_unlock(&mqp->mq_lock));
577*f3e7f55eSRobert Mustacchi 	if (ret == 0)
578*f3e7f55eSRobert Mustacchi 		(void) mergeq_thr_merge(mqp);
579*f3e7f55eSRobert Mustacchi 
580*f3e7f55eSRobert Mustacchi 	for (i = 0; i < mqp->mq_ndthreads; i++) {
581*f3e7f55eSRobert Mustacchi 		VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL));
582*f3e7f55eSRobert Mustacchi 	}
583*f3e7f55eSRobert Mustacchi 
584*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_lock(&mqp->mq_lock));
585*f3e7f55eSRobert Mustacchi 
586*f3e7f55eSRobert Mustacchi 	VERIFY(mqp->mq_nactthrs == 0);
587*f3e7f55eSRobert Mustacchi 	mqp->mq_working = B_FALSE;
588*f3e7f55eSRobert Mustacchi 	if (ret == 0 && mqp->mq_iserror == B_FALSE) {
589*f3e7f55eSRobert Mustacchi 		VERIFY(mqp->mq_nitems == 1);
590*f3e7f55eSRobert Mustacchi 		*outp = mergeq_pop_one(mqp);
591*f3e7f55eSRobert Mustacchi 	} else if (ret == 0 && mqp->mq_iserror == B_TRUE) {
592*f3e7f55eSRobert Mustacchi 		ret = MERGEQ_UERROR;
593*f3e7f55eSRobert Mustacchi 		if (errp != NULL)
594*f3e7f55eSRobert Mustacchi 			*errp = mqp->mq_error;
595*f3e7f55eSRobert Mustacchi 	} else {
596*f3e7f55eSRobert Mustacchi 		seterr = B_TRUE;
597*f3e7f55eSRobert Mustacchi 	}
598*f3e7f55eSRobert Mustacchi 
599*f3e7f55eSRobert Mustacchi 	mergeq_reset(mqp);
600*f3e7f55eSRobert Mustacchi 	VERIFY0(mutex_unlock(&mqp->mq_lock));
601*f3e7f55eSRobert Mustacchi 
602*f3e7f55eSRobert Mustacchi 	if (seterr == B_TRUE)
603*f3e7f55eSRobert Mustacchi 		return (mergeq_error(ret));
604*f3e7f55eSRobert Mustacchi 
605*f3e7f55eSRobert Mustacchi 	return (ret);
606*f3e7f55eSRobert Mustacchi }
607