1*7fd79137SRobert Mustacchi /* 2*7fd79137SRobert Mustacchi * This file and its contents are supplied under the terms of the 3*7fd79137SRobert Mustacchi * Common Development and Distribution License ("CDDL"), version 1.0. 4*7fd79137SRobert Mustacchi * You may only use this file in accordance with the terms of version 5*7fd79137SRobert Mustacchi * 1.0 of the CDDL. 6*7fd79137SRobert Mustacchi * 7*7fd79137SRobert Mustacchi * A full copy of the text of the CDDL should have accompanied this 8*7fd79137SRobert Mustacchi * source. A copy of the CDDL is also available via the Internet at 9*7fd79137SRobert Mustacchi * http://www.illumos.org/license/CDDL. 10*7fd79137SRobert Mustacchi */ 11*7fd79137SRobert Mustacchi 12*7fd79137SRobert Mustacchi /* 13*7fd79137SRobert Mustacchi * Copyright 2015 Joyent, Inc. 14*7fd79137SRobert Mustacchi */ 15*7fd79137SRobert Mustacchi 16*7fd79137SRobert Mustacchi /* 17*7fd79137SRobert Mustacchi * Merge queue 18*7fd79137SRobert Mustacchi * 19*7fd79137SRobert Mustacchi * A multi-threaded merging queue. 20*7fd79137SRobert Mustacchi * 21*7fd79137SRobert Mustacchi * The general constraint of the merge queue is that if a set of items are 22*7fd79137SRobert Mustacchi * inserted into the queue in the same order, then no matter how many threads 23*7fd79137SRobert Mustacchi * are on the scene, we will always process the items in the same order. The 24*7fd79137SRobert Mustacchi * secondary constraint is that to support environments that must be 25*7fd79137SRobert Mustacchi * single-threaded, we explicitly *must not* create a thread in the case where 26*7fd79137SRobert Mustacchi * the number of requested threads is just one. 27*7fd79137SRobert Mustacchi * 28*7fd79137SRobert Mustacchi * To that end, we've designed our queue as a circular buffer. We will grow that 29*7fd79137SRobert Mustacchi * buffer to contain enough space for all the input items, after which we'll 30*7fd79137SRobert Mustacchi * then treat it as a circular buffer. 31*7fd79137SRobert Mustacchi * 32*7fd79137SRobert Mustacchi * Items will be issued to a processing function two at a time, until there is 33*7fd79137SRobert Mustacchi * only one item remaining in the queue, at which point we will be doing any 34*7fd79137SRobert Mustacchi * merging work. 35*7fd79137SRobert Mustacchi * 36*7fd79137SRobert Mustacchi * A given queue has three different entries that we care about tracking: 37*7fd79137SRobert Mustacchi * 38*7fd79137SRobert Mustacchi * o mq_nproc - What is the slot of the next item to process for something 39*7fd79137SRobert Mustacchi * looking for work. 40*7fd79137SRobert Mustacchi * 41*7fd79137SRobert Mustacchi * o mq_next - What is the slot of the next item that should be inserted into 42*7fd79137SRobert Mustacchi * the queue. 43*7fd79137SRobert Mustacchi * 44*7fd79137SRobert Mustacchi * o mq_ncommit - What is the slot of the next item that should be committed. 45*7fd79137SRobert Mustacchi * 46*7fd79137SRobert Mustacchi * When a thread comes and looks for work, we pop entries off of the queue based 47*7fd79137SRobert Mustacchi * on the index provided by mq_nproc. At the same time, it also gets the slot 48*7fd79137SRobert Mustacchi * that it should place the result in, which is mq_next. However, because we 49*7fd79137SRobert Mustacchi * have multiple threads that are operating on the system, we want to make sure 50*7fd79137SRobert Mustacchi * that we push things onto the queue in order. We do that by allocating a slot 51*7fd79137SRobert Mustacchi * to each task and when it completes, it waits for its slot to be ready based 52*7fd79137SRobert Mustacchi * on it being the value of mq_ncommit. 53*7fd79137SRobert Mustacchi * 54*7fd79137SRobert Mustacchi * In addition, we keep track of the number of items in the queue as well as the 55*7fd79137SRobert Mustacchi * number of active workers. There's also a generation count that is used to 56*7fd79137SRobert Mustacchi * figure out when the various values might lap one another. 57*7fd79137SRobert Mustacchi * 58*7fd79137SRobert Mustacchi * The following images show what happens when we have a queue with six items 59*7fd79137SRobert Mustacchi * and whose capacity has been shrunk to six, to better fit in the screen. 60*7fd79137SRobert Mustacchi * 61*7fd79137SRobert Mustacchi * 62*7fd79137SRobert Mustacchi * 1) This is the initial configuration of the queue right before any processing 63*7fd79137SRobert Mustacchi * is done in the context of mergeq_merge(). Every box has an initial item for 64*7fd79137SRobert Mustacchi * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and 65*7fd79137SRobert Mustacchi * mq_ncommit will all point at the initial entry. However, the mq_next has 66*7fd79137SRobert Mustacchi * already lapped around the array and thus has a generation count of one. 67*7fd79137SRobert Mustacchi * 68*7fd79137SRobert Mustacchi * The '+' characters indicate which bucket the corresponding value of mq_nproc, 69*7fd79137SRobert Mustacchi * mq_ncommit, and mq_nproc. 70*7fd79137SRobert Mustacchi * 71*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 72*7fd79137SRobert Mustacchi * | X || X || X || X || X || X | 73*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 74*7fd79137SRobert Mustacchi * mq_next (g1) + 75*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 76*7fd79137SRobert Mustacchi * mq_nproc (g0) + 77*7fd79137SRobert Mustacchi * 78*7fd79137SRobert Mustacchi * 2) This shows the state right as the first thread begins to process an entry. 79*7fd79137SRobert Mustacchi * Note in this example we will have two threads processing this queue. Note, 80*7fd79137SRobert Mustacchi * mq_ncommit has not advanced. This is because the first thread has started 81*7fd79137SRobert Mustacchi * processing entries, but it has not finished, and thus we can't commit it. 82*7fd79137SRobert Mustacchi * We've incremented mq_next by one because it has gone ahead and assigned a 83*7fd79137SRobert Mustacchi * single entry. We've incremented mq_nproc by two, because we have removed two 84*7fd79137SRobert Mustacchi * entries and thus will have another set available. 85*7fd79137SRobert Mustacchi * 86*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 0 87*7fd79137SRobert Mustacchi * | || || X || X || X || X | t2 - idle 88*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 89*7fd79137SRobert Mustacchi * mq_next (g1) + 90*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 91*7fd79137SRobert Mustacchi * mq_nproc (g0) + 92*7fd79137SRobert Mustacchi * 93*7fd79137SRobert Mustacchi * 94*7fd79137SRobert Mustacchi * 3) This shows the state right after the second thread begins to process an 95*7fd79137SRobert Mustacchi * entry, note that the first thread has not finished. The changes are very 96*7fd79137SRobert Mustacchi * similar to the previous state, we've advanced, mq_nproc and mq_next, but not 97*7fd79137SRobert Mustacchi * mq_ncommit. 98*7fd79137SRobert Mustacchi * 99*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 0 100*7fd79137SRobert Mustacchi * | || || || || X || X | t2 - slot 1 101*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 102*7fd79137SRobert Mustacchi * mq_next (g1) + 103*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 104*7fd79137SRobert Mustacchi * mq_nproc (g0) + 105*7fd79137SRobert Mustacchi * 106*7fd79137SRobert Mustacchi * 4) This shows the state after thread one has finished processing an item, but 107*7fd79137SRobert Mustacchi * before it does anything else. Note that even if thread two finishes early, it 108*7fd79137SRobert Mustacchi * cannot commit its item until thread one finishes. Here 'Y' refers to the 109*7fd79137SRobert Mustacchi * result of merging the first two 'X's. 110*7fd79137SRobert Mustacchi * 111*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - idle 112*7fd79137SRobert Mustacchi * | Y || || || || X || X | t2 - slot 1 113*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 114*7fd79137SRobert Mustacchi * mq_next (g1) + 115*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 116*7fd79137SRobert Mustacchi * mq_nproc (g0) + 117*7fd79137SRobert Mustacchi * 118*7fd79137SRobert Mustacchi * 5) This shows the state after thread one has begun to process the next round 119*7fd79137SRobert Mustacchi * and after thread two has committed, but before it begins processing the next 120*7fd79137SRobert Mustacchi * item. Note that mq_nproc has wrapped around and we've bumped its generation 121*7fd79137SRobert Mustacchi * counter. 122*7fd79137SRobert Mustacchi * 123*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 2 124*7fd79137SRobert Mustacchi * | Y || Y || || || || | t2 - idle 125*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 126*7fd79137SRobert Mustacchi * mq_next (g1) + 127*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 128*7fd79137SRobert Mustacchi * mq_nproc (g0) + 129*7fd79137SRobert Mustacchi * 130*7fd79137SRobert Mustacchi * 6) Here, thread two, will take the next two Y values and thread 1 will commit 131*7fd79137SRobert Mustacchi * its 'Y'. Thread one now must wait until thread two finishes such that it can 132*7fd79137SRobert Mustacchi * do additional work. 133*7fd79137SRobert Mustacchi * 134*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - waiting 135*7fd79137SRobert Mustacchi * | || || Y || || || | t2 - slot 3 136*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 137*7fd79137SRobert Mustacchi * mq_next (g1) + 138*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 139*7fd79137SRobert Mustacchi * mq_nproc (g0) + 140*7fd79137SRobert Mustacchi * 141*7fd79137SRobert Mustacchi * 7) Here, thread two has committed and thread one is about to go process the 142*7fd79137SRobert Mustacchi * final entry. The character 'Z' represents the results of merging two 'Y's. 143*7fd79137SRobert Mustacchi * 144*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - idle 145*7fd79137SRobert Mustacchi * | || || Y || Z || || | t2 - idle 146*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 147*7fd79137SRobert Mustacchi * mq_next (g1) + 148*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 149*7fd79137SRobert Mustacchi * mq_nproc (g0) + 150*7fd79137SRobert Mustacchi * 151*7fd79137SRobert Mustacchi * 8) Here, thread one is processing the final item. Thread two is waiting in 152*7fd79137SRobert Mustacchi * mergeq_pop() for enough items to be available. In this case, it will never 153*7fd79137SRobert Mustacchi * happen; however, once all threads have finished it will break out. 154*7fd79137SRobert Mustacchi * 155*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 4 156*7fd79137SRobert Mustacchi * | || || || || || | t2 - idle 157*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 158*7fd79137SRobert Mustacchi * mq_next (g1) + 159*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 160*7fd79137SRobert Mustacchi * mq_nproc (g0) + 161*7fd79137SRobert Mustacchi * 162*7fd79137SRobert Mustacchi * 9) This is the final state of the queue, it has a single '*' item which is 163*7fd79137SRobert Mustacchi * the final merge result. At this point, both thread one and thread two would 164*7fd79137SRobert Mustacchi * stop processing and we'll return the result to the user. 165*7fd79137SRobert Mustacchi * 166*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ t1 - slot 4 167*7fd79137SRobert Mustacchi * | || || || || * || | t2 - idle 168*7fd79137SRobert Mustacchi * +---++---++---++---++---++---+ 169*7fd79137SRobert Mustacchi * mq_next (g1) + 170*7fd79137SRobert Mustacchi * mq_ncommit (g0) + 171*7fd79137SRobert Mustacchi * mq_nproc (g0) + 172*7fd79137SRobert Mustacchi * 173*7fd79137SRobert Mustacchi * 174*7fd79137SRobert Mustacchi * Note, that if at any point in time the processing function fails, then all 175*7fd79137SRobert Mustacchi * the merges will quiesce and that error will be propagated back to the user. 176*7fd79137SRobert Mustacchi */ 177*7fd79137SRobert Mustacchi 178*7fd79137SRobert Mustacchi #include <strings.h> 179*7fd79137SRobert Mustacchi #include <sys/debug.h> 180*7fd79137SRobert Mustacchi #include <thread.h> 181*7fd79137SRobert Mustacchi #include <synch.h> 182*7fd79137SRobert Mustacchi #include <errno.h> 183*7fd79137SRobert Mustacchi #include <limits.h> 184*7fd79137SRobert Mustacchi #include <stdlib.h> 185*7fd79137SRobert Mustacchi 186*7fd79137SRobert Mustacchi #include "mergeq.h" 187*7fd79137SRobert Mustacchi 188*7fd79137SRobert Mustacchi struct mergeq { 189*7fd79137SRobert Mustacchi mutex_t mq_lock; /* Protects items below */ 190*7fd79137SRobert Mustacchi cond_t mq_cond; /* Condition variable */ 191*7fd79137SRobert Mustacchi void **mq_items; /* Array of items to process */ 192*7fd79137SRobert Mustacchi size_t mq_nitems; /* Number of items in the queue */ 193*7fd79137SRobert Mustacchi size_t mq_cap; /* Capacity of the items */ 194*7fd79137SRobert Mustacchi size_t mq_next; /* Place to put next entry */ 195*7fd79137SRobert Mustacchi size_t mq_gnext; /* Generation for next */ 196*7fd79137SRobert Mustacchi size_t mq_nproc; /* Index of next thing to process */ 197*7fd79137SRobert Mustacchi size_t mq_gnproc; /* Generation for next proc */ 198*7fd79137SRobert Mustacchi size_t mq_ncommit; /* Index of the next thing to commit */ 199*7fd79137SRobert Mustacchi size_t mq_gncommit; /* Commit generation */ 200*7fd79137SRobert Mustacchi uint_t mq_nactthrs; /* Number of active threads */ 201*7fd79137SRobert Mustacchi uint_t mq_ndthreads; /* Desired number of threads */ 202*7fd79137SRobert Mustacchi thread_t *mq_thrs; /* Actual threads */ 203*7fd79137SRobert Mustacchi mergeq_proc_f *mq_func; /* Processing function */ 204*7fd79137SRobert Mustacchi void *mq_arg; /* Argument for processing */ 205*7fd79137SRobert Mustacchi boolean_t mq_working; /* Are we working on processing */ 206*7fd79137SRobert Mustacchi boolean_t mq_iserror; /* Have we encountered an error? */ 207*7fd79137SRobert Mustacchi int mq_error; 208*7fd79137SRobert Mustacchi }; 209*7fd79137SRobert Mustacchi 210*7fd79137SRobert Mustacchi #define MERGEQ_DEFAULT_CAP 64 211*7fd79137SRobert Mustacchi 212*7fd79137SRobert Mustacchi static int 213*7fd79137SRobert Mustacchi mergeq_error(int err) 214*7fd79137SRobert Mustacchi { 215*7fd79137SRobert Mustacchi errno = err; 216*7fd79137SRobert Mustacchi return (MERGEQ_ERROR); 217*7fd79137SRobert Mustacchi } 218*7fd79137SRobert Mustacchi 219*7fd79137SRobert Mustacchi void 220*7fd79137SRobert Mustacchi mergeq_fini(mergeq_t *mqp) 221*7fd79137SRobert Mustacchi { 222*7fd79137SRobert Mustacchi if (mqp == NULL) 223*7fd79137SRobert Mustacchi return; 224*7fd79137SRobert Mustacchi 225*7fd79137SRobert Mustacchi VERIFY(mqp->mq_working != B_TRUE); 226*7fd79137SRobert Mustacchi 227*7fd79137SRobert Mustacchi if (mqp->mq_items != NULL) 228*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap); 229*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) { 230*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_thrs, sizeof (thread_t) * 231*7fd79137SRobert Mustacchi mqp->mq_ndthreads); 232*7fd79137SRobert Mustacchi } 233*7fd79137SRobert Mustacchi VERIFY0(cond_destroy(&mqp->mq_cond)); 234*7fd79137SRobert Mustacchi VERIFY0(mutex_destroy(&mqp->mq_lock)); 235*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t)); 236*7fd79137SRobert Mustacchi } 237*7fd79137SRobert Mustacchi 238*7fd79137SRobert Mustacchi int 239*7fd79137SRobert Mustacchi mergeq_init(mergeq_t **outp, uint_t nthrs) 240*7fd79137SRobert Mustacchi { 241*7fd79137SRobert Mustacchi int ret; 242*7fd79137SRobert Mustacchi mergeq_t *mqp; 243*7fd79137SRobert Mustacchi 244*7fd79137SRobert Mustacchi mqp = mergeq_alloc(sizeof (mergeq_t)); 245*7fd79137SRobert Mustacchi if (mqp == NULL) 246*7fd79137SRobert Mustacchi return (mergeq_error(ENOMEM)); 247*7fd79137SRobert Mustacchi 248*7fd79137SRobert Mustacchi bzero(mqp, sizeof (mergeq_t)); 249*7fd79137SRobert Mustacchi mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP); 250*7fd79137SRobert Mustacchi if (mqp->mq_items == NULL) { 251*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t)); 252*7fd79137SRobert Mustacchi return (mergeq_error(ENOMEM)); 253*7fd79137SRobert Mustacchi } 254*7fd79137SRobert Mustacchi bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP); 255*7fd79137SRobert Mustacchi 256*7fd79137SRobert Mustacchi mqp->mq_ndthreads = nthrs - 1; 257*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) { 258*7fd79137SRobert Mustacchi mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) * 259*7fd79137SRobert Mustacchi mqp->mq_ndthreads); 260*7fd79137SRobert Mustacchi if (mqp->mq_thrs == NULL) { 261*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) * 262*7fd79137SRobert Mustacchi MERGEQ_DEFAULT_CAP); 263*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t)); 264*7fd79137SRobert Mustacchi return (mergeq_error(ENOMEM)); 265*7fd79137SRobert Mustacchi } 266*7fd79137SRobert Mustacchi } 267*7fd79137SRobert Mustacchi 268*7fd79137SRobert Mustacchi if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK, 269*7fd79137SRobert Mustacchi NULL)) != 0) { 270*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) { 271*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_thrs, 272*7fd79137SRobert Mustacchi sizeof (thread_t) * mqp->mq_ndthreads); 273*7fd79137SRobert Mustacchi } 274*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) * 275*7fd79137SRobert Mustacchi MERGEQ_DEFAULT_CAP); 276*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t)); 277*7fd79137SRobert Mustacchi return (mergeq_error(ret)); 278*7fd79137SRobert Mustacchi } 279*7fd79137SRobert Mustacchi 280*7fd79137SRobert Mustacchi if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) { 281*7fd79137SRobert Mustacchi VERIFY0(mutex_destroy(&mqp->mq_lock)); 282*7fd79137SRobert Mustacchi if (mqp->mq_ndthreads > 0) { 283*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_thrs, 284*7fd79137SRobert Mustacchi sizeof (thread_t) * mqp->mq_ndthreads); 285*7fd79137SRobert Mustacchi } 286*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (void *) * 287*7fd79137SRobert Mustacchi MERGEQ_DEFAULT_CAP); 288*7fd79137SRobert Mustacchi mergeq_free(mqp, sizeof (mergeq_t)); 289*7fd79137SRobert Mustacchi return (mergeq_error(ret)); 290*7fd79137SRobert Mustacchi } 291*7fd79137SRobert Mustacchi 292*7fd79137SRobert Mustacchi mqp->mq_cap = MERGEQ_DEFAULT_CAP; 293*7fd79137SRobert Mustacchi *outp = mqp; 294*7fd79137SRobert Mustacchi return (0); 295*7fd79137SRobert Mustacchi } 296*7fd79137SRobert Mustacchi 297*7fd79137SRobert Mustacchi static void 298*7fd79137SRobert Mustacchi mergeq_reset(mergeq_t *mqp) 299*7fd79137SRobert Mustacchi { 300*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock)); 301*7fd79137SRobert Mustacchi VERIFY(mqp->mq_working == B_FALSE); 302*7fd79137SRobert Mustacchi if (mqp->mq_cap != 0) 303*7fd79137SRobert Mustacchi bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap); 304*7fd79137SRobert Mustacchi mqp->mq_nitems = 0; 305*7fd79137SRobert Mustacchi mqp->mq_next = 0; 306*7fd79137SRobert Mustacchi mqp->mq_gnext = 0; 307*7fd79137SRobert Mustacchi mqp->mq_nproc = 0; 308*7fd79137SRobert Mustacchi mqp->mq_gnproc = 0; 309*7fd79137SRobert Mustacchi mqp->mq_ncommit = 0; 310*7fd79137SRobert Mustacchi mqp->mq_gncommit = 0; 311*7fd79137SRobert Mustacchi mqp->mq_func = NULL; 312*7fd79137SRobert Mustacchi mqp->mq_arg = NULL; 313*7fd79137SRobert Mustacchi mqp->mq_iserror = B_FALSE; 314*7fd79137SRobert Mustacchi mqp->mq_error = 0; 315*7fd79137SRobert Mustacchi } 316*7fd79137SRobert Mustacchi 317*7fd79137SRobert Mustacchi static int 318*7fd79137SRobert Mustacchi mergeq_grow(mergeq_t *mqp) 319*7fd79137SRobert Mustacchi { 320*7fd79137SRobert Mustacchi size_t ncap; 321*7fd79137SRobert Mustacchi void **items; 322*7fd79137SRobert Mustacchi 323*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock)); 324*7fd79137SRobert Mustacchi VERIFY(mqp->mq_working == B_FALSE); 325*7fd79137SRobert Mustacchi 326*7fd79137SRobert Mustacchi if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP) 327*7fd79137SRobert Mustacchi return (ENOSPC); 328*7fd79137SRobert Mustacchi 329*7fd79137SRobert Mustacchi ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP; 330*7fd79137SRobert Mustacchi items = mergeq_alloc(ncap * sizeof (void *)); 331*7fd79137SRobert Mustacchi if (items == NULL) 332*7fd79137SRobert Mustacchi return (ENOMEM); 333*7fd79137SRobert Mustacchi 334*7fd79137SRobert Mustacchi bzero(items, ncap * sizeof (void *)); 335*7fd79137SRobert Mustacchi bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *)); 336*7fd79137SRobert Mustacchi mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *)); 337*7fd79137SRobert Mustacchi mqp->mq_items = items; 338*7fd79137SRobert Mustacchi mqp->mq_cap = ncap; 339*7fd79137SRobert Mustacchi return (0); 340*7fd79137SRobert Mustacchi } 341*7fd79137SRobert Mustacchi 342*7fd79137SRobert Mustacchi int 343*7fd79137SRobert Mustacchi mergeq_add(mergeq_t *mqp, void *item) 344*7fd79137SRobert Mustacchi { 345*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock)); 346*7fd79137SRobert Mustacchi if (mqp->mq_working == B_TRUE) { 347*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 348*7fd79137SRobert Mustacchi return (mergeq_error(ENXIO)); 349*7fd79137SRobert Mustacchi } 350*7fd79137SRobert Mustacchi 351*7fd79137SRobert Mustacchi if (mqp->mq_next == mqp->mq_cap) { 352*7fd79137SRobert Mustacchi int ret; 353*7fd79137SRobert Mustacchi 354*7fd79137SRobert Mustacchi if ((ret = mergeq_grow(mqp)) != 0) { 355*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 356*7fd79137SRobert Mustacchi return (mergeq_error(ret)); 357*7fd79137SRobert Mustacchi } 358*7fd79137SRobert Mustacchi } 359*7fd79137SRobert Mustacchi mqp->mq_items[mqp->mq_next] = item; 360*7fd79137SRobert Mustacchi mqp->mq_next++; 361*7fd79137SRobert Mustacchi mqp->mq_nitems++; 362*7fd79137SRobert Mustacchi 363*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 364*7fd79137SRobert Mustacchi return (0); 365*7fd79137SRobert Mustacchi } 366*7fd79137SRobert Mustacchi 367*7fd79137SRobert Mustacchi static size_t 368*7fd79137SRobert Mustacchi mergeq_slot(mergeq_t *mqp) 369*7fd79137SRobert Mustacchi { 370*7fd79137SRobert Mustacchi size_t s; 371*7fd79137SRobert Mustacchi 372*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock)); 373*7fd79137SRobert Mustacchi VERIFY(mqp->mq_next < mqp->mq_cap); 374*7fd79137SRobert Mustacchi 375*7fd79137SRobert Mustacchi /* 376*7fd79137SRobert Mustacchi * This probably should be a cv / wait thing. 377*7fd79137SRobert Mustacchi */ 378*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap); 379*7fd79137SRobert Mustacchi 380*7fd79137SRobert Mustacchi s = mqp->mq_next; 381*7fd79137SRobert Mustacchi mqp->mq_next++; 382*7fd79137SRobert Mustacchi if (mqp->mq_next == mqp->mq_cap) { 383*7fd79137SRobert Mustacchi mqp->mq_next %= mqp->mq_cap; 384*7fd79137SRobert Mustacchi mqp->mq_gnext++; 385*7fd79137SRobert Mustacchi } 386*7fd79137SRobert Mustacchi 387*7fd79137SRobert Mustacchi return (s); 388*7fd79137SRobert Mustacchi } 389*7fd79137SRobert Mustacchi 390*7fd79137SRobert Mustacchi /* 391*7fd79137SRobert Mustacchi * Internal function to push items onto the queue which is now a circular 392*7fd79137SRobert Mustacchi * buffer. This should only be used once we begin working on the queue. 393*7fd79137SRobert Mustacchi */ 394*7fd79137SRobert Mustacchi static void 395*7fd79137SRobert Mustacchi mergeq_push(mergeq_t *mqp, size_t slot, void *item) 396*7fd79137SRobert Mustacchi { 397*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock)); 398*7fd79137SRobert Mustacchi VERIFY(slot < mqp->mq_cap); 399*7fd79137SRobert Mustacchi 400*7fd79137SRobert Mustacchi /* 401*7fd79137SRobert Mustacchi * We need to verify that we don't push over something that exists. 402*7fd79137SRobert Mustacchi * Based on the design, this should never happen. However, in the face 403*7fd79137SRobert Mustacchi * of bugs, anything is possible. 404*7fd79137SRobert Mustacchi */ 405*7fd79137SRobert Mustacchi while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE) 406*7fd79137SRobert Mustacchi (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); 407*7fd79137SRobert Mustacchi 408*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_TRUE) 409*7fd79137SRobert Mustacchi return; 410*7fd79137SRobert Mustacchi 411*7fd79137SRobert Mustacchi mqp->mq_items[slot] = item; 412*7fd79137SRobert Mustacchi mqp->mq_nitems++; 413*7fd79137SRobert Mustacchi mqp->mq_ncommit++; 414*7fd79137SRobert Mustacchi if (mqp->mq_ncommit == mqp->mq_cap) { 415*7fd79137SRobert Mustacchi mqp->mq_ncommit %= mqp->mq_cap; 416*7fd79137SRobert Mustacchi mqp->mq_gncommit++; 417*7fd79137SRobert Mustacchi } 418*7fd79137SRobert Mustacchi (void) cond_broadcast(&mqp->mq_cond); 419*7fd79137SRobert Mustacchi } 420*7fd79137SRobert Mustacchi 421*7fd79137SRobert Mustacchi static void * 422*7fd79137SRobert Mustacchi mergeq_pop_one(mergeq_t *mqp) 423*7fd79137SRobert Mustacchi { 424*7fd79137SRobert Mustacchi void *out; 425*7fd79137SRobert Mustacchi 426*7fd79137SRobert Mustacchi /* 427*7fd79137SRobert Mustacchi * We can't move mq_nproc beyond mq_next if they're on the same 428*7fd79137SRobert Mustacchi * generation. 429*7fd79137SRobert Mustacchi */ 430*7fd79137SRobert Mustacchi VERIFY(mqp->mq_gnext != mqp->mq_gnproc || 431*7fd79137SRobert Mustacchi mqp->mq_nproc != mqp->mq_next); 432*7fd79137SRobert Mustacchi 433*7fd79137SRobert Mustacchi out = mqp->mq_items[mqp->mq_nproc]; 434*7fd79137SRobert Mustacchi 435*7fd79137SRobert Mustacchi mqp->mq_items[mqp->mq_nproc] = NULL; 436*7fd79137SRobert Mustacchi mqp->mq_nproc++; 437*7fd79137SRobert Mustacchi if (mqp->mq_nproc == mqp->mq_cap) { 438*7fd79137SRobert Mustacchi mqp->mq_nproc %= mqp->mq_cap; 439*7fd79137SRobert Mustacchi mqp->mq_gnproc++; 440*7fd79137SRobert Mustacchi } 441*7fd79137SRobert Mustacchi mqp->mq_nitems--; 442*7fd79137SRobert Mustacchi 443*7fd79137SRobert Mustacchi return (out); 444*7fd79137SRobert Mustacchi } 445*7fd79137SRobert Mustacchi 446*7fd79137SRobert Mustacchi /* 447*7fd79137SRobert Mustacchi * Pop a set of two entries from the queue. We may not have anything to process 448*7fd79137SRobert Mustacchi * at the moment, eg. be waiting for someone to add something. In which case, 449*7fd79137SRobert Mustacchi * we'll be sitting and waiting. 450*7fd79137SRobert Mustacchi */ 451*7fd79137SRobert Mustacchi static boolean_t 452*7fd79137SRobert Mustacchi mergeq_pop(mergeq_t *mqp, void **first, void **second) 453*7fd79137SRobert Mustacchi { 454*7fd79137SRobert Mustacchi VERIFY(MUTEX_HELD(&mqp->mq_lock)); 455*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nproc < mqp->mq_cap); 456*7fd79137SRobert Mustacchi 457*7fd79137SRobert Mustacchi while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 && 458*7fd79137SRobert Mustacchi mqp->mq_iserror == B_FALSE) 459*7fd79137SRobert Mustacchi (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); 460*7fd79137SRobert Mustacchi 461*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_TRUE) 462*7fd79137SRobert Mustacchi return (B_FALSE); 463*7fd79137SRobert Mustacchi 464*7fd79137SRobert Mustacchi if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) { 465*7fd79137SRobert Mustacchi VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1); 466*7fd79137SRobert Mustacchi return (B_FALSE); 467*7fd79137SRobert Mustacchi } 468*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nitems >= 2); 469*7fd79137SRobert Mustacchi 470*7fd79137SRobert Mustacchi *first = mergeq_pop_one(mqp); 471*7fd79137SRobert Mustacchi *second = mergeq_pop_one(mqp); 472*7fd79137SRobert Mustacchi 473*7fd79137SRobert Mustacchi return (B_TRUE); 474*7fd79137SRobert Mustacchi } 475*7fd79137SRobert Mustacchi 476*7fd79137SRobert Mustacchi static void * 477*7fd79137SRobert Mustacchi mergeq_thr_merge(void *arg) 478*7fd79137SRobert Mustacchi { 479*7fd79137SRobert Mustacchi mergeq_t *mqp = arg; 480*7fd79137SRobert Mustacchi 481*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock)); 482*7fd79137SRobert Mustacchi 483*7fd79137SRobert Mustacchi /* 484*7fd79137SRobert Mustacchi * Check to make sure creation worked and if not, fail fast. 485*7fd79137SRobert Mustacchi */ 486*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_TRUE) { 487*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 488*7fd79137SRobert Mustacchi return (NULL); 489*7fd79137SRobert Mustacchi } 490*7fd79137SRobert Mustacchi 491*7fd79137SRobert Mustacchi for (;;) { 492*7fd79137SRobert Mustacchi void *first, *second, *out; 493*7fd79137SRobert Mustacchi int ret; 494*7fd79137SRobert Mustacchi size_t slot; 495*7fd79137SRobert Mustacchi 496*7fd79137SRobert Mustacchi if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) { 497*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 498*7fd79137SRobert Mustacchi return (NULL); 499*7fd79137SRobert Mustacchi } 500*7fd79137SRobert Mustacchi 501*7fd79137SRobert Mustacchi if (mergeq_pop(mqp, &first, &second) == B_FALSE) { 502*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 503*7fd79137SRobert Mustacchi return (NULL); 504*7fd79137SRobert Mustacchi } 505*7fd79137SRobert Mustacchi slot = mergeq_slot(mqp); 506*7fd79137SRobert Mustacchi 507*7fd79137SRobert Mustacchi mqp->mq_nactthrs++; 508*7fd79137SRobert Mustacchi 509*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 510*7fd79137SRobert Mustacchi ret = mqp->mq_func(first, second, &out, mqp->mq_arg); 511*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock)); 512*7fd79137SRobert Mustacchi 513*7fd79137SRobert Mustacchi if (ret != 0) { 514*7fd79137SRobert Mustacchi if (mqp->mq_iserror == B_FALSE) { 515*7fd79137SRobert Mustacchi mqp->mq_iserror = B_TRUE; 516*7fd79137SRobert Mustacchi mqp->mq_error = ret; 517*7fd79137SRobert Mustacchi (void) cond_broadcast(&mqp->mq_cond); 518*7fd79137SRobert Mustacchi } 519*7fd79137SRobert Mustacchi mqp->mq_nactthrs--; 520*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 521*7fd79137SRobert Mustacchi return (NULL); 522*7fd79137SRobert Mustacchi } 523*7fd79137SRobert Mustacchi mergeq_push(mqp, slot, out); 524*7fd79137SRobert Mustacchi mqp->mq_nactthrs--; 525*7fd79137SRobert Mustacchi } 526*7fd79137SRobert Mustacchi } 527*7fd79137SRobert Mustacchi 528*7fd79137SRobert Mustacchi int 529*7fd79137SRobert Mustacchi mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp, 530*7fd79137SRobert Mustacchi int *errp) 531*7fd79137SRobert Mustacchi { 532*7fd79137SRobert Mustacchi int ret, i; 533*7fd79137SRobert Mustacchi boolean_t seterr = B_FALSE; 534*7fd79137SRobert Mustacchi 535*7fd79137SRobert Mustacchi if (mqp == NULL || func == NULL || outp == NULL) { 536*7fd79137SRobert Mustacchi return (mergeq_error(EINVAL)); 537*7fd79137SRobert Mustacchi } 538*7fd79137SRobert Mustacchi 539*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock)); 540*7fd79137SRobert Mustacchi if (mqp->mq_working == B_TRUE) { 541*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 542*7fd79137SRobert Mustacchi return (mergeq_error(EBUSY)); 543*7fd79137SRobert Mustacchi } 544*7fd79137SRobert Mustacchi 545*7fd79137SRobert Mustacchi if (mqp->mq_nitems == 0) { 546*7fd79137SRobert Mustacchi *outp = NULL; 547*7fd79137SRobert Mustacchi mergeq_reset(mqp); 548*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 549*7fd79137SRobert Mustacchi return (0); 550*7fd79137SRobert Mustacchi } 551*7fd79137SRobert Mustacchi 552*7fd79137SRobert Mustacchi /* 553*7fd79137SRobert Mustacchi * Now that we've finished adding items to the queue, turn it into a 554*7fd79137SRobert Mustacchi * circular buffer. 555*7fd79137SRobert Mustacchi */ 556*7fd79137SRobert Mustacchi mqp->mq_func = func; 557*7fd79137SRobert Mustacchi mqp->mq_arg = arg; 558*7fd79137SRobert Mustacchi mqp->mq_nproc = 0; 559*7fd79137SRobert Mustacchi mqp->mq_working = B_TRUE; 560*7fd79137SRobert Mustacchi if (mqp->mq_next == mqp->mq_cap) { 561*7fd79137SRobert Mustacchi mqp->mq_next %= mqp->mq_cap; 562*7fd79137SRobert Mustacchi mqp->mq_gnext++; 563*7fd79137SRobert Mustacchi } 564*7fd79137SRobert Mustacchi mqp->mq_ncommit = mqp->mq_next; 565*7fd79137SRobert Mustacchi 566*7fd79137SRobert Mustacchi ret = 0; 567*7fd79137SRobert Mustacchi for (i = 0; i < mqp->mq_ndthreads; i++) { 568*7fd79137SRobert Mustacchi ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0, 569*7fd79137SRobert Mustacchi &mqp->mq_thrs[i]); 570*7fd79137SRobert Mustacchi if (ret != 0) { 571*7fd79137SRobert Mustacchi mqp->mq_iserror = B_TRUE; 572*7fd79137SRobert Mustacchi break; 573*7fd79137SRobert Mustacchi } 574*7fd79137SRobert Mustacchi } 575*7fd79137SRobert Mustacchi 576*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 577*7fd79137SRobert Mustacchi if (ret == 0) 578*7fd79137SRobert Mustacchi (void) mergeq_thr_merge(mqp); 579*7fd79137SRobert Mustacchi 580*7fd79137SRobert Mustacchi for (i = 0; i < mqp->mq_ndthreads; i++) { 581*7fd79137SRobert Mustacchi VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL)); 582*7fd79137SRobert Mustacchi } 583*7fd79137SRobert Mustacchi 584*7fd79137SRobert Mustacchi VERIFY0(mutex_lock(&mqp->mq_lock)); 585*7fd79137SRobert Mustacchi 586*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nactthrs == 0); 587*7fd79137SRobert Mustacchi mqp->mq_working = B_FALSE; 588*7fd79137SRobert Mustacchi if (ret == 0 && mqp->mq_iserror == B_FALSE) { 589*7fd79137SRobert Mustacchi VERIFY(mqp->mq_nitems == 1); 590*7fd79137SRobert Mustacchi *outp = mergeq_pop_one(mqp); 591*7fd79137SRobert Mustacchi } else if (ret == 0 && mqp->mq_iserror == B_TRUE) { 592*7fd79137SRobert Mustacchi ret = MERGEQ_UERROR; 593*7fd79137SRobert Mustacchi if (errp != NULL) 594*7fd79137SRobert Mustacchi *errp = mqp->mq_error; 595*7fd79137SRobert Mustacchi } else { 596*7fd79137SRobert Mustacchi seterr = B_TRUE; 597*7fd79137SRobert Mustacchi } 598*7fd79137SRobert Mustacchi 599*7fd79137SRobert Mustacchi mergeq_reset(mqp); 600*7fd79137SRobert Mustacchi VERIFY0(mutex_unlock(&mqp->mq_lock)); 601*7fd79137SRobert Mustacchi 602*7fd79137SRobert Mustacchi if (seterr == B_TRUE) 603*7fd79137SRobert Mustacchi return (mergeq_error(ret)); 604*7fd79137SRobert Mustacchi 605*7fd79137SRobert Mustacchi return (ret); 606*7fd79137SRobert Mustacchi } 607