1*7fd79137SRobert Mustacchi /*
2*7fd79137SRobert Mustacchi * This file and its contents are supplied under the terms of the
3*7fd79137SRobert Mustacchi * Common Development and Distribution License ("CDDL"), version 1.0.
4*7fd79137SRobert Mustacchi * You may only use this file in accordance with the terms of version
5*7fd79137SRobert Mustacchi * 1.0 of the CDDL.
6*7fd79137SRobert Mustacchi *
7*7fd79137SRobert Mustacchi * A full copy of the text of the CDDL should have accompanied this
8*7fd79137SRobert Mustacchi * source. A copy of the CDDL is also available via the Internet at
9*7fd79137SRobert Mustacchi * http://www.illumos.org/license/CDDL.
10*7fd79137SRobert Mustacchi */
11*7fd79137SRobert Mustacchi
12*7fd79137SRobert Mustacchi /*
13*7fd79137SRobert Mustacchi * Copyright 2015 Joyent, Inc.
14*7fd79137SRobert Mustacchi */
15*7fd79137SRobert Mustacchi
16*7fd79137SRobert Mustacchi /*
17*7fd79137SRobert Mustacchi * Merge queue
18*7fd79137SRobert Mustacchi *
19*7fd79137SRobert Mustacchi * A multi-threaded merging queue.
20*7fd79137SRobert Mustacchi *
21*7fd79137SRobert Mustacchi * The general constraint of the merge queue is that if a set of items are
22*7fd79137SRobert Mustacchi * inserted into the queue in the same order, then no matter how many threads
23*7fd79137SRobert Mustacchi * are on the scene, we will always process the items in the same order. The
24*7fd79137SRobert Mustacchi * secondary constraint is that to support environments that must be
25*7fd79137SRobert Mustacchi * single-threaded, we explicitly *must not* create a thread in the case where
26*7fd79137SRobert Mustacchi * the number of requested threads is just one.
27*7fd79137SRobert Mustacchi *
28*7fd79137SRobert Mustacchi * To that end, we've designed our queue as a circular buffer. We will grow that
29*7fd79137SRobert Mustacchi * buffer to contain enough space for all the input items, after which we'll
30*7fd79137SRobert Mustacchi * then treat it as a circular buffer.
31*7fd79137SRobert Mustacchi *
32*7fd79137SRobert Mustacchi * Items will be issued to a processing function two at a time, until there is
33*7fd79137SRobert Mustacchi * only one item remaining in the queue, at which point we will be doing any
34*7fd79137SRobert Mustacchi * merging work.
35*7fd79137SRobert Mustacchi *
36*7fd79137SRobert Mustacchi * A given queue has three different entries that we care about tracking:
37*7fd79137SRobert Mustacchi *
38*7fd79137SRobert Mustacchi * o mq_nproc - What is the slot of the next item to process for something
39*7fd79137SRobert Mustacchi * looking for work.
40*7fd79137SRobert Mustacchi *
41*7fd79137SRobert Mustacchi * o mq_next - What is the slot of the next item that should be inserted into
42*7fd79137SRobert Mustacchi * the queue.
43*7fd79137SRobert Mustacchi *
44*7fd79137SRobert Mustacchi * o mq_ncommit - What is the slot of the next item that should be committed.
45*7fd79137SRobert Mustacchi *
46*7fd79137SRobert Mustacchi * When a thread comes and looks for work, we pop entries off of the queue based
47*7fd79137SRobert Mustacchi * on the index provided by mq_nproc. At the same time, it also gets the slot
48*7fd79137SRobert Mustacchi * that it should place the result in, which is mq_next. However, because we
49*7fd79137SRobert Mustacchi * have multiple threads that are operating on the system, we want to make sure
50*7fd79137SRobert Mustacchi * that we push things onto the queue in order. We do that by allocating a slot
51*7fd79137SRobert Mustacchi * to each task and when it completes, it waits for its slot to be ready based
52*7fd79137SRobert Mustacchi * on it being the value of mq_ncommit.
53*7fd79137SRobert Mustacchi *
54*7fd79137SRobert Mustacchi * In addition, we keep track of the number of items in the queue as well as the
55*7fd79137SRobert Mustacchi * number of active workers. There's also a generation count that is used to
56*7fd79137SRobert Mustacchi * figure out when the various values might lap one another.
57*7fd79137SRobert Mustacchi *
58*7fd79137SRobert Mustacchi * The following images show what happens when we have a queue with six items
59*7fd79137SRobert Mustacchi * and whose capacity has been shrunk to six, to better fit in the screen.
60*7fd79137SRobert Mustacchi *
61*7fd79137SRobert Mustacchi *
62*7fd79137SRobert Mustacchi * 1) This is the initial configuration of the queue right before any processing
63*7fd79137SRobert Mustacchi * is done in the context of mergeq_merge(). Every box has an initial item for
64*7fd79137SRobert Mustacchi * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and
65*7fd79137SRobert Mustacchi * mq_ncommit will all point at the initial entry. However, the mq_next has
66*7fd79137SRobert Mustacchi * already lapped around the array and thus has a generation count of one.
67*7fd79137SRobert Mustacchi *
68*7fd79137SRobert Mustacchi * The '+' characters indicate which bucket the corresponding value of mq_nproc,
69*7fd79137SRobert Mustacchi * mq_ncommit, and mq_nproc.
70*7fd79137SRobert Mustacchi *
71*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
72*7fd79137SRobert Mustacchi * | X || X || X || X || X || X |
73*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
74*7fd79137SRobert Mustacchi * mq_next (g1) +
75*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
76*7fd79137SRobert Mustacchi * mq_nproc (g0) +
77*7fd79137SRobert Mustacchi *
78*7fd79137SRobert Mustacchi * 2) This shows the state right as the first thread begins to process an entry.
79*7fd79137SRobert Mustacchi * Note in this example we will have two threads processing this queue. Note,
80*7fd79137SRobert Mustacchi * mq_ncommit has not advanced. This is because the first thread has started
81*7fd79137SRobert Mustacchi * processing entries, but it has not finished, and thus we can't commit it.
82*7fd79137SRobert Mustacchi * We've incremented mq_next by one because it has gone ahead and assigned a
83*7fd79137SRobert Mustacchi * single entry. We've incremented mq_nproc by two, because we have removed two
84*7fd79137SRobert Mustacchi * entries and thus will have another set available.
85*7fd79137SRobert Mustacchi *
86*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 0
87*7fd79137SRobert Mustacchi * | || || X || X || X || X | t2 - idle
88*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
89*7fd79137SRobert Mustacchi * mq_next (g1) +
90*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
91*7fd79137SRobert Mustacchi * mq_nproc (g0) +
92*7fd79137SRobert Mustacchi *
93*7fd79137SRobert Mustacchi *
94*7fd79137SRobert Mustacchi * 3) This shows the state right after the second thread begins to process an
95*7fd79137SRobert Mustacchi * entry, note that the first thread has not finished. The changes are very
96*7fd79137SRobert Mustacchi * similar to the previous state, we've advanced, mq_nproc and mq_next, but not
97*7fd79137SRobert Mustacchi * mq_ncommit.
98*7fd79137SRobert Mustacchi *
99*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 0
100*7fd79137SRobert Mustacchi * | || || || || X || X | t2 - slot 1
101*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
102*7fd79137SRobert Mustacchi * mq_next (g1) +
103*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
104*7fd79137SRobert Mustacchi * mq_nproc (g0) +
105*7fd79137SRobert Mustacchi *
106*7fd79137SRobert Mustacchi * 4) This shows the state after thread one has finished processing an item, but
107*7fd79137SRobert Mustacchi * before it does anything else. Note that even if thread two finishes early, it
108*7fd79137SRobert Mustacchi * cannot commit its item until thread one finishes. Here 'Y' refers to the
109*7fd79137SRobert Mustacchi * result of merging the first two 'X's.
110*7fd79137SRobert Mustacchi *
111*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - idle
112*7fd79137SRobert Mustacchi * | Y || || || || X || X | t2 - slot 1
113*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
114*7fd79137SRobert Mustacchi * mq_next (g1) +
115*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
116*7fd79137SRobert Mustacchi * mq_nproc (g0) +
117*7fd79137SRobert Mustacchi *
118*7fd79137SRobert Mustacchi * 5) This shows the state after thread one has begun to process the next round
119*7fd79137SRobert Mustacchi * and after thread two has committed, but before it begins processing the next
120*7fd79137SRobert Mustacchi * item. Note that mq_nproc has wrapped around and we've bumped its generation
121*7fd79137SRobert Mustacchi * counter.
122*7fd79137SRobert Mustacchi *
123*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 2
124*7fd79137SRobert Mustacchi * | Y || Y || || || || | t2 - idle
125*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
126*7fd79137SRobert Mustacchi * mq_next (g1) +
127*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
128*7fd79137SRobert Mustacchi * mq_nproc (g0) +
129*7fd79137SRobert Mustacchi *
130*7fd79137SRobert Mustacchi * 6) Here, thread two, will take the next two Y values and thread 1 will commit
131*7fd79137SRobert Mustacchi * its 'Y'. Thread one now must wait until thread two finishes such that it can
132*7fd79137SRobert Mustacchi * do additional work.
133*7fd79137SRobert Mustacchi *
134*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - waiting
135*7fd79137SRobert Mustacchi * | || || Y || || || | t2 - slot 3
136*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
137*7fd79137SRobert Mustacchi * mq_next (g1) +
138*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
139*7fd79137SRobert Mustacchi * mq_nproc (g0) +
140*7fd79137SRobert Mustacchi *
141*7fd79137SRobert Mustacchi * 7) Here, thread two has committed and thread one is about to go process the
142*7fd79137SRobert Mustacchi * final entry. The character 'Z' represents the results of merging two 'Y's.
143*7fd79137SRobert Mustacchi *
144*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - idle
145*7fd79137SRobert Mustacchi * | || || Y || Z || || | t2 - idle
146*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
147*7fd79137SRobert Mustacchi * mq_next (g1) +
148*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
149*7fd79137SRobert Mustacchi * mq_nproc (g0) +
150*7fd79137SRobert Mustacchi *
151*7fd79137SRobert Mustacchi * 8) Here, thread one is processing the final item. Thread two is waiting in
152*7fd79137SRobert Mustacchi * mergeq_pop() for enough items to be available. In this case, it will never
153*7fd79137SRobert Mustacchi * happen; however, once all threads have finished it will break out.
154*7fd79137SRobert Mustacchi *
155*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 4
156*7fd79137SRobert Mustacchi * | || || || || || | t2 - idle
157*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
158*7fd79137SRobert Mustacchi * mq_next (g1) +
159*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
160*7fd79137SRobert Mustacchi * mq_nproc (g0) +
161*7fd79137SRobert Mustacchi *
162*7fd79137SRobert Mustacchi * 9) This is the final state of the queue, it has a single '*' item which is
163*7fd79137SRobert Mustacchi * the final merge result. At this point, both thread one and thread two would
164*7fd79137SRobert Mustacchi * stop processing and we'll return the result to the user.
165*7fd79137SRobert Mustacchi *
166*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 4
167*7fd79137SRobert Mustacchi * | || || || || * || | t2 - idle
168*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+
169*7fd79137SRobert Mustacchi * mq_next (g1) +
170*7fd79137SRobert Mustacchi * mq_ncommit (g0) +
171*7fd79137SRobert Mustacchi * mq_nproc (g0) +
172*7fd79137SRobert Mustacchi *
173*7fd79137SRobert Mustacchi *
174*7fd79137SRobert Mustacchi * Note, that if at any point in time the processing function fails, then all
175*7fd79137SRobert Mustacchi * the merges will quiesce and that error will be propagated back to the user.
176*7fd79137SRobert Mustacchi */
177*7fd79137SRobert Mustacchi
178*7fd79137SRobert Mustacchi #include <strings.h>
179*7fd79137SRobert Mustacchi #include <sys/debug.h>
180*7fd79137SRobert Mustacchi #include <thread.h>
181*7fd79137SRobert Mustacchi #include <synch.h>
182*7fd79137SRobert Mustacchi #include <errno.h>
183*7fd79137SRobert Mustacchi #include <limits.h>
184*7fd79137SRobert Mustacchi #include <stdlib.h>
185*7fd79137SRobert Mustacchi
186*7fd79137SRobert Mustacchi #include "mergeq.h"
187*7fd79137SRobert Mustacchi
188*7fd79137SRobert Mustacchi struct mergeq {
189*7fd79137SRobert Mustacchi mutex_t mq_lock; /* Protects items below */
190*7fd79137SRobert Mustacchi cond_t mq_cond; /* Condition variable */
191*7fd79137SRobert Mustacchi void **mq_items; /* Array of items to process */
192*7fd79137SRobert Mustacchi size_t mq_nitems; /* Number of items in the queue */
193*7fd79137SRobert Mustacchi size_t mq_cap; /* Capacity of the items */
194*7fd79137SRobert Mustacchi size_t mq_next; /* Place to put next entry */
195*7fd79137SRobert Mustacchi size_t mq_gnext; /* Generation for next */
196*7fd79137SRobert Mustacchi size_t mq_nproc; /* Index of next thing to process */
197*7fd79137SRobert Mustacchi size_t mq_gnproc; /* Generation for next proc */
198*7fd79137SRobert Mustacchi size_t mq_ncommit; /* Index of the next thing to commit */
199*7fd79137SRobert Mustacchi size_t mq_gncommit; /* Commit generation */
200*7fd79137SRobert Mustacchi uint_t mq_nactthrs; /* Number of active threads */
201*7fd79137SRobert Mustacchi uint_t mq_ndthreads; /* Desired number of threads */
202*7fd79137SRobert Mustacchi thread_t *mq_thrs; /* Actual threads */
203*7fd79137SRobert Mustacchi mergeq_proc_f *mq_func; /* Processing function */
204*7fd79137SRobert Mustacchi void *mq_arg; /* Argument for processing */
205*7fd79137SRobert Mustacchi boolean_t mq_working; /* Are we working on processing */
206*7fd79137SRobert Mustacchi boolean_t mq_iserror; /* Have we encountered an error? */
207*7fd79137SRobert Mustacchi int mq_error;
208*7fd79137SRobert Mustacchi };
209*7fd79137SRobert Mustacchi
210*7fd79137SRobert Mustacchi #define MERGEQ_DEFAULT_CAP 64
211*7fd79137SRobert Mustacchi
212*7fd79137SRobert Mustacchi static int
mergeq_error(int err)213*7fd79137SRobert Mustacchi mergeq_error(int err)
214*7fd79137SRobert Mustacchi {
215*7fd79137SRobert Mustacchi errno = err;
216*7fd79137SRobert Mustacchi return (MERGEQ_ERROR);
217*7fd79137SRobert Mustacchi }
218*7fd79137SRobert Mustacchi
219*7fd79137SRobert Mustacchi void
mergeq_fini(mergeq_t * mqp)220*7fd79137SRobert Mustacchi mergeq_fini(mergeq_t *mqp)
221*7fd79137SRobert Mustacchi {
222*7fd79137SRobert Mustacchi if (mqp == NULL)
223*7fd79137SRobert Mustacchi return;
224*7fd79137SRobert Mustacchi
225*7fd79137SRobert Mustacchi VERIFY(mqp->mq_working != B_TRUE);
226*7fd79137SRobert Mustacchi
227*7fd79137SRobert Mustacchi if (mqp->mq_items != NULL)
228*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
229*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) {
230*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_thrs, sizeof (thread_t) *
231*7fd79137SRobert Mustacchi mqp->mq_ndthreads);
232*7fd79137SRobert Mustacchi }
233*7fd79137SRobert Mustacchi VERIFY0(cond_destroy(&mqp->mq_cond));
234*7fd79137SRobert Mustacchi VERIFY0(mutex_destroy(&mqp->mq_lock));
235*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t));
236*7fd79137SRobert Mustacchi }
237*7fd79137SRobert Mustacchi
238*7fd79137SRobert Mustacchi int
mergeq_init(mergeq_t ** outp,uint_t nthrs)239*7fd79137SRobert Mustacchi mergeq_init(mergeq_t **outp, uint_t nthrs)
240*7fd79137SRobert Mustacchi {
241*7fd79137SRobert Mustacchi int ret;
242*7fd79137SRobert Mustacchi mergeq_t *mqp;
243*7fd79137SRobert Mustacchi
244*7fd79137SRobert Mustacchi mqp = mergeq_alloc(sizeof (mergeq_t));
245*7fd79137SRobert Mustacchi if (mqp == NULL)
246*7fd79137SRobert Mustacchi return (mergeq_error(ENOMEM));
247*7fd79137SRobert Mustacchi
248*7fd79137SRobert Mustacchi bzero(mqp, sizeof (mergeq_t));
249*7fd79137SRobert Mustacchi mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP);
250*7fd79137SRobert Mustacchi if (mqp->mq_items == NULL) {
251*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t));
252*7fd79137SRobert Mustacchi return (mergeq_error(ENOMEM));
253*7fd79137SRobert Mustacchi }
254*7fd79137SRobert Mustacchi bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP);
255*7fd79137SRobert Mustacchi
256*7fd79137SRobert Mustacchi mqp->mq_ndthreads = nthrs - 1;
257*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) {
258*7fd79137SRobert Mustacchi mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) *
259*7fd79137SRobert Mustacchi mqp->mq_ndthreads);
260*7fd79137SRobert Mustacchi if (mqp->mq_thrs == NULL) {
261*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) *
262*7fd79137SRobert Mustacchi MERGEQ_DEFAULT_CAP);
263*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t));
264*7fd79137SRobert Mustacchi return (mergeq_error(ENOMEM));
265*7fd79137SRobert Mustacchi }
266*7fd79137SRobert Mustacchi }
267*7fd79137SRobert Mustacchi
268*7fd79137SRobert Mustacchi if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
269*7fd79137SRobert Mustacchi NULL)) != 0) {
270*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) {
271*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_thrs,
272*7fd79137SRobert Mustacchi sizeof (thread_t) * mqp->mq_ndthreads);
273*7fd79137SRobert Mustacchi }
274*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) *
275*7fd79137SRobert Mustacchi MERGEQ_DEFAULT_CAP);
276*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t));
277*7fd79137SRobert Mustacchi return (mergeq_error(ret));
278*7fd79137SRobert Mustacchi }
279*7fd79137SRobert Mustacchi
280*7fd79137SRobert Mustacchi if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) {
281*7fd79137SRobert Mustacchi VERIFY0(mutex_destroy(&mqp->mq_lock));
282*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) {
283*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_thrs,
284*7fd79137SRobert Mustacchi sizeof (thread_t) * mqp->mq_ndthreads);
285*7fd79137SRobert Mustacchi }
286*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) *
287*7fd79137SRobert Mustacchi MERGEQ_DEFAULT_CAP);
288*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t));
289*7fd79137SRobert Mustacchi return (mergeq_error(ret));
290*7fd79137SRobert Mustacchi }
291*7fd79137SRobert Mustacchi
292*7fd79137SRobert Mustacchi mqp->mq_cap = MERGEQ_DEFAULT_CAP;
293*7fd79137SRobert Mustacchi *outp = mqp;
294*7fd79137SRobert Mustacchi return (0);
295*7fd79137SRobert Mustacchi }
296*7fd79137SRobert Mustacchi
297*7fd79137SRobert Mustacchi static void
mergeq_reset(mergeq_t * mqp)298*7fd79137SRobert Mustacchi mergeq_reset(mergeq_t *mqp)
299*7fd79137SRobert Mustacchi {
300*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock));
301*7fd79137SRobert Mustacchi VERIFY(mqp->mq_working == B_FALSE);
302*7fd79137SRobert Mustacchi if (mqp->mq_cap != 0)
303*7fd79137SRobert Mustacchi bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
304*7fd79137SRobert Mustacchi mqp->mq_nitems = 0;
305*7fd79137SRobert Mustacchi mqp->mq_next = 0;
306*7fd79137SRobert Mustacchi mqp->mq_gnext = 0;
307*7fd79137SRobert Mustacchi mqp->mq_nproc = 0;
308*7fd79137SRobert Mustacchi mqp->mq_gnproc = 0;
309*7fd79137SRobert Mustacchi mqp->mq_ncommit = 0;
310*7fd79137SRobert Mustacchi mqp->mq_gncommit = 0;
311*7fd79137SRobert Mustacchi mqp->mq_func = NULL;
312*7fd79137SRobert Mustacchi mqp->mq_arg = NULL;
313*7fd79137SRobert Mustacchi mqp->mq_iserror = B_FALSE;
314*7fd79137SRobert Mustacchi mqp->mq_error = 0;
315*7fd79137SRobert Mustacchi }
316*7fd79137SRobert Mustacchi
317*7fd79137SRobert Mustacchi static int
mergeq_grow(mergeq_t * mqp)318*7fd79137SRobert Mustacchi mergeq_grow(mergeq_t *mqp)
319*7fd79137SRobert Mustacchi {
320*7fd79137SRobert Mustacchi size_t ncap;
321*7fd79137SRobert Mustacchi void **items;
322*7fd79137SRobert Mustacchi
323*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock));
324*7fd79137SRobert Mustacchi VERIFY(mqp->mq_working == B_FALSE);
325*7fd79137SRobert Mustacchi
326*7fd79137SRobert Mustacchi if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP)
327*7fd79137SRobert Mustacchi return (ENOSPC);
328*7fd79137SRobert Mustacchi
329*7fd79137SRobert Mustacchi ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP;
330*7fd79137SRobert Mustacchi items = mergeq_alloc(ncap * sizeof (void *));
331*7fd79137SRobert Mustacchi if (items == NULL)
332*7fd79137SRobert Mustacchi return (ENOMEM);
333*7fd79137SRobert Mustacchi
334*7fd79137SRobert Mustacchi bzero(items, ncap * sizeof (void *));
335*7fd79137SRobert Mustacchi bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *));
336*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *));
337*7fd79137SRobert Mustacchi mqp->mq_items = items;
338*7fd79137SRobert Mustacchi mqp->mq_cap = ncap;
339*7fd79137SRobert Mustacchi return (0);
340*7fd79137SRobert Mustacchi }
341*7fd79137SRobert Mustacchi
342*7fd79137SRobert Mustacchi int
mergeq_add(mergeq_t * mqp,void * item)343*7fd79137SRobert Mustacchi mergeq_add(mergeq_t *mqp, void *item)
344*7fd79137SRobert Mustacchi {
345*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock));
346*7fd79137SRobert Mustacchi if (mqp->mq_working == B_TRUE) {
347*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
348*7fd79137SRobert Mustacchi return (mergeq_error(ENXIO));
349*7fd79137SRobert Mustacchi }
350*7fd79137SRobert Mustacchi
351*7fd79137SRobert Mustacchi if (mqp->mq_next == mqp->mq_cap) {
352*7fd79137SRobert Mustacchi int ret;
353*7fd79137SRobert Mustacchi
354*7fd79137SRobert Mustacchi if ((ret = mergeq_grow(mqp)) != 0) {
355*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
356*7fd79137SRobert Mustacchi return (mergeq_error(ret));
357*7fd79137SRobert Mustacchi }
358*7fd79137SRobert Mustacchi }
359*7fd79137SRobert Mustacchi mqp->mq_items[mqp->mq_next] = item;
360*7fd79137SRobert Mustacchi mqp->mq_next++;
361*7fd79137SRobert Mustacchi mqp->mq_nitems++;
362*7fd79137SRobert Mustacchi
363*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
364*7fd79137SRobert Mustacchi return (0);
365*7fd79137SRobert Mustacchi }
366*7fd79137SRobert Mustacchi
367*7fd79137SRobert Mustacchi static size_t
mergeq_slot(mergeq_t * mqp)368*7fd79137SRobert Mustacchi mergeq_slot(mergeq_t *mqp)
369*7fd79137SRobert Mustacchi {
370*7fd79137SRobert Mustacchi size_t s;
371*7fd79137SRobert Mustacchi
372*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock));
373*7fd79137SRobert Mustacchi VERIFY(mqp->mq_next < mqp->mq_cap);
374*7fd79137SRobert Mustacchi
375*7fd79137SRobert Mustacchi /*
376*7fd79137SRobert Mustacchi * This probably should be a cv / wait thing.
377*7fd79137SRobert Mustacchi */
378*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap);
379*7fd79137SRobert Mustacchi
380*7fd79137SRobert Mustacchi s = mqp->mq_next;
381*7fd79137SRobert Mustacchi mqp->mq_next++;
382*7fd79137SRobert Mustacchi if (mqp->mq_next == mqp->mq_cap) {
383*7fd79137SRobert Mustacchi mqp->mq_next %= mqp->mq_cap;
384*7fd79137SRobert Mustacchi mqp->mq_gnext++;
385*7fd79137SRobert Mustacchi }
386*7fd79137SRobert Mustacchi
387*7fd79137SRobert Mustacchi return (s);
388*7fd79137SRobert Mustacchi }
389*7fd79137SRobert Mustacchi
390*7fd79137SRobert Mustacchi /*
391*7fd79137SRobert Mustacchi * Internal function to push items onto the queue which is now a circular
392*7fd79137SRobert Mustacchi * buffer. This should only be used once we begin working on the queue.
393*7fd79137SRobert Mustacchi */
394*7fd79137SRobert Mustacchi static void
mergeq_push(mergeq_t * mqp,size_t slot,void * item)395*7fd79137SRobert Mustacchi mergeq_push(mergeq_t *mqp, size_t slot, void *item)
396*7fd79137SRobert Mustacchi {
397*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock));
398*7fd79137SRobert Mustacchi VERIFY(slot < mqp->mq_cap);
399*7fd79137SRobert Mustacchi
400*7fd79137SRobert Mustacchi /*
401*7fd79137SRobert Mustacchi * We need to verify that we don't push over something that exists.
402*7fd79137SRobert Mustacchi * Based on the design, this should never happen. However, in the face
403*7fd79137SRobert Mustacchi * of bugs, anything is possible.
404*7fd79137SRobert Mustacchi */
405*7fd79137SRobert Mustacchi while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE)
406*7fd79137SRobert Mustacchi (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
407*7fd79137SRobert Mustacchi
408*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_TRUE)
409*7fd79137SRobert Mustacchi return;
410*7fd79137SRobert Mustacchi
411*7fd79137SRobert Mustacchi mqp->mq_items[slot] = item;
412*7fd79137SRobert Mustacchi mqp->mq_nitems++;
413*7fd79137SRobert Mustacchi mqp->mq_ncommit++;
414*7fd79137SRobert Mustacchi if (mqp->mq_ncommit == mqp->mq_cap) {
415*7fd79137SRobert Mustacchi mqp->mq_ncommit %= mqp->mq_cap;
416*7fd79137SRobert Mustacchi mqp->mq_gncommit++;
417*7fd79137SRobert Mustacchi }
418*7fd79137SRobert Mustacchi (void) cond_broadcast(&mqp->mq_cond);
419*7fd79137SRobert Mustacchi }
420*7fd79137SRobert Mustacchi
421*7fd79137SRobert Mustacchi static void *
mergeq_pop_one(mergeq_t * mqp)422*7fd79137SRobert Mustacchi mergeq_pop_one(mergeq_t *mqp)
423*7fd79137SRobert Mustacchi {
424*7fd79137SRobert Mustacchi void *out;
425*7fd79137SRobert Mustacchi
426*7fd79137SRobert Mustacchi /*
427*7fd79137SRobert Mustacchi * We can't move mq_nproc beyond mq_next if they're on the same
428*7fd79137SRobert Mustacchi * generation.
429*7fd79137SRobert Mustacchi */
430*7fd79137SRobert Mustacchi VERIFY(mqp->mq_gnext != mqp->mq_gnproc ||
431*7fd79137SRobert Mustacchi mqp->mq_nproc != mqp->mq_next);
432*7fd79137SRobert Mustacchi
433*7fd79137SRobert Mustacchi out = mqp->mq_items[mqp->mq_nproc];
434*7fd79137SRobert Mustacchi
435*7fd79137SRobert Mustacchi mqp->mq_items[mqp->mq_nproc] = NULL;
436*7fd79137SRobert Mustacchi mqp->mq_nproc++;
437*7fd79137SRobert Mustacchi if (mqp->mq_nproc == mqp->mq_cap) {
438*7fd79137SRobert Mustacchi mqp->mq_nproc %= mqp->mq_cap;
439*7fd79137SRobert Mustacchi mqp->mq_gnproc++;
440*7fd79137SRobert Mustacchi }
441*7fd79137SRobert Mustacchi mqp->mq_nitems--;
442*7fd79137SRobert Mustacchi
443*7fd79137SRobert Mustacchi return (out);
444*7fd79137SRobert Mustacchi }
445*7fd79137SRobert Mustacchi
446*7fd79137SRobert Mustacchi /*
447*7fd79137SRobert Mustacchi * Pop a set of two entries from the queue. We may not have anything to process
448*7fd79137SRobert Mustacchi * at the moment, eg. be waiting for someone to add something. In which case,
449*7fd79137SRobert Mustacchi * we'll be sitting and waiting.
450*7fd79137SRobert Mustacchi */
451*7fd79137SRobert Mustacchi static boolean_t
mergeq_pop(mergeq_t * mqp,void ** first,void ** second)452*7fd79137SRobert Mustacchi mergeq_pop(mergeq_t *mqp, void **first, void **second)
453*7fd79137SRobert Mustacchi {
454*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock));
455*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nproc < mqp->mq_cap);
456*7fd79137SRobert Mustacchi
457*7fd79137SRobert Mustacchi while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 &&
458*7fd79137SRobert Mustacchi mqp->mq_iserror == B_FALSE)
459*7fd79137SRobert Mustacchi (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
460*7fd79137SRobert Mustacchi
461*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_TRUE)
462*7fd79137SRobert Mustacchi return (B_FALSE);
463*7fd79137SRobert Mustacchi
464*7fd79137SRobert Mustacchi if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) {
465*7fd79137SRobert Mustacchi VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1);
466*7fd79137SRobert Mustacchi return (B_FALSE);
467*7fd79137SRobert Mustacchi }
468*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nitems >= 2);
469*7fd79137SRobert Mustacchi
470*7fd79137SRobert Mustacchi *first = mergeq_pop_one(mqp);
471*7fd79137SRobert Mustacchi *second = mergeq_pop_one(mqp);
472*7fd79137SRobert Mustacchi
473*7fd79137SRobert Mustacchi return (B_TRUE);
474*7fd79137SRobert Mustacchi }
475*7fd79137SRobert Mustacchi
476*7fd79137SRobert Mustacchi static void *
mergeq_thr_merge(void * arg)477*7fd79137SRobert Mustacchi mergeq_thr_merge(void *arg)
478*7fd79137SRobert Mustacchi {
479*7fd79137SRobert Mustacchi mergeq_t *mqp = arg;
480*7fd79137SRobert Mustacchi
481*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock));
482*7fd79137SRobert Mustacchi
483*7fd79137SRobert Mustacchi /*
484*7fd79137SRobert Mustacchi * Check to make sure creation worked and if not, fail fast.
485*7fd79137SRobert Mustacchi */
486*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_TRUE) {
487*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
488*7fd79137SRobert Mustacchi return (NULL);
489*7fd79137SRobert Mustacchi }
490*7fd79137SRobert Mustacchi
491*7fd79137SRobert Mustacchi for (;;) {
492*7fd79137SRobert Mustacchi void *first, *second, *out;
493*7fd79137SRobert Mustacchi int ret;
494*7fd79137SRobert Mustacchi size_t slot;
495*7fd79137SRobert Mustacchi
496*7fd79137SRobert Mustacchi if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) {
497*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
498*7fd79137SRobert Mustacchi return (NULL);
499*7fd79137SRobert Mustacchi }
500*7fd79137SRobert Mustacchi
501*7fd79137SRobert Mustacchi if (mergeq_pop(mqp, &first, &second) == B_FALSE) {
502*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
503*7fd79137SRobert Mustacchi return (NULL);
504*7fd79137SRobert Mustacchi }
505*7fd79137SRobert Mustacchi slot = mergeq_slot(mqp);
506*7fd79137SRobert Mustacchi
507*7fd79137SRobert Mustacchi mqp->mq_nactthrs++;
508*7fd79137SRobert Mustacchi
509*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
510*7fd79137SRobert Mustacchi ret = mqp->mq_func(first, second, &out, mqp->mq_arg);
511*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock));
512*7fd79137SRobert Mustacchi
513*7fd79137SRobert Mustacchi if (ret != 0) {
514*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_FALSE) {
515*7fd79137SRobert Mustacchi mqp->mq_iserror = B_TRUE;
516*7fd79137SRobert Mustacchi mqp->mq_error = ret;
517*7fd79137SRobert Mustacchi (void) cond_broadcast(&mqp->mq_cond);
518*7fd79137SRobert Mustacchi }
519*7fd79137SRobert Mustacchi mqp->mq_nactthrs--;
520*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
521*7fd79137SRobert Mustacchi return (NULL);
522*7fd79137SRobert Mustacchi }
523*7fd79137SRobert Mustacchi mergeq_push(mqp, slot, out);
524*7fd79137SRobert Mustacchi mqp->mq_nactthrs--;
525*7fd79137SRobert Mustacchi }
526*7fd79137SRobert Mustacchi }
527*7fd79137SRobert Mustacchi
528*7fd79137SRobert Mustacchi int
mergeq_merge(mergeq_t * mqp,mergeq_proc_f * func,void * arg,void ** outp,int * errp)529*7fd79137SRobert Mustacchi mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp,
530*7fd79137SRobert Mustacchi int *errp)
531*7fd79137SRobert Mustacchi {
532*7fd79137SRobert Mustacchi int ret, i;
533*7fd79137SRobert Mustacchi boolean_t seterr = B_FALSE;
534*7fd79137SRobert Mustacchi
535*7fd79137SRobert Mustacchi if (mqp == NULL || func == NULL || outp == NULL) {
536*7fd79137SRobert Mustacchi return (mergeq_error(EINVAL));
537*7fd79137SRobert Mustacchi }
538*7fd79137SRobert Mustacchi
539*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock));
540*7fd79137SRobert Mustacchi if (mqp->mq_working == B_TRUE) {
541*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
542*7fd79137SRobert Mustacchi return (mergeq_error(EBUSY));
543*7fd79137SRobert Mustacchi }
544*7fd79137SRobert Mustacchi
545*7fd79137SRobert Mustacchi if (mqp->mq_nitems == 0) {
546*7fd79137SRobert Mustacchi *outp = NULL;
547*7fd79137SRobert Mustacchi mergeq_reset(mqp);
548*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
549*7fd79137SRobert Mustacchi return (0);
550*7fd79137SRobert Mustacchi }
551*7fd79137SRobert Mustacchi
552*7fd79137SRobert Mustacchi /*
553*7fd79137SRobert Mustacchi * Now that we've finished adding items to the queue, turn it into a
554*7fd79137SRobert Mustacchi * circular buffer.
555*7fd79137SRobert Mustacchi */
556*7fd79137SRobert Mustacchi mqp->mq_func = func;
557*7fd79137SRobert Mustacchi mqp->mq_arg = arg;
558*7fd79137SRobert Mustacchi mqp->mq_nproc = 0;
559*7fd79137SRobert Mustacchi mqp->mq_working = B_TRUE;
560*7fd79137SRobert Mustacchi if (mqp->mq_next == mqp->mq_cap) {
561*7fd79137SRobert Mustacchi mqp->mq_next %= mqp->mq_cap;
562*7fd79137SRobert Mustacchi mqp->mq_gnext++;
563*7fd79137SRobert Mustacchi }
564*7fd79137SRobert Mustacchi mqp->mq_ncommit = mqp->mq_next;
565*7fd79137SRobert Mustacchi
566*7fd79137SRobert Mustacchi ret = 0;
567*7fd79137SRobert Mustacchi for (i = 0; i < mqp->mq_ndthreads; i++) {
568*7fd79137SRobert Mustacchi ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0,
569*7fd79137SRobert Mustacchi &mqp->mq_thrs[i]);
570*7fd79137SRobert Mustacchi if (ret != 0) {
571*7fd79137SRobert Mustacchi mqp->mq_iserror = B_TRUE;
572*7fd79137SRobert Mustacchi break;
573*7fd79137SRobert Mustacchi }
574*7fd79137SRobert Mustacchi }
575*7fd79137SRobert Mustacchi
576*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
577*7fd79137SRobert Mustacchi if (ret == 0)
578*7fd79137SRobert Mustacchi (void) mergeq_thr_merge(mqp);
579*7fd79137SRobert Mustacchi
580*7fd79137SRobert Mustacchi for (i = 0; i < mqp->mq_ndthreads; i++) {
581*7fd79137SRobert Mustacchi VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL));
582*7fd79137SRobert Mustacchi }
583*7fd79137SRobert Mustacchi
584*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock));
585*7fd79137SRobert Mustacchi
586*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nactthrs == 0);
587*7fd79137SRobert Mustacchi mqp->mq_working = B_FALSE;
588*7fd79137SRobert Mustacchi if (ret == 0 && mqp->mq_iserror == B_FALSE) {
589*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nitems == 1);
590*7fd79137SRobert Mustacchi *outp = mergeq_pop_one(mqp);
591*7fd79137SRobert Mustacchi } else if (ret == 0 && mqp->mq_iserror == B_TRUE) {
592*7fd79137SRobert Mustacchi ret = MERGEQ_UERROR;
593*7fd79137SRobert Mustacchi if (errp != NULL)
594*7fd79137SRobert Mustacchi *errp = mqp->mq_error;
595*7fd79137SRobert Mustacchi } else {
596*7fd79137SRobert Mustacchi seterr = B_TRUE;
597*7fd79137SRobert Mustacchi }
598*7fd79137SRobert Mustacchi
599*7fd79137SRobert Mustacchi mergeq_reset(mqp);
600*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock));
601*7fd79137SRobert Mustacchi
602*7fd79137SRobert Mustacchi if (seterr == B_TRUE)
603*7fd79137SRobert Mustacchi return (mergeq_error(ret));
604*7fd79137SRobert Mustacchi
605*7fd79137SRobert Mustacchi return (ret);
606*7fd79137SRobert Mustacchi }
607