xref: /illumos-gate/usr/src/lib/mergeq/mergeq.c (revision 66582b606a8194f7f3ba5b3a3a6dca5b0d346361)
1 /*
2  * This file and its contents are supplied under the terms of the
3  * Common Development and Distribution License ("CDDL"), version 1.0.
4  * You may only use this file in accordance with the terms of version
5  * 1.0 of the CDDL.
6  *
7  * A full copy of the text of the CDDL should have accompanied this
8  * source.  A copy of the CDDL is also available via the Internet at
9  * http://www.illumos.org/license/CDDL.
10  */
11 
12 /*
13  * Copyright 2015 Joyent, Inc.
14  */
15 
16 /*
17  * Merge queue
18  *
19  * A multi-threaded merging queue.
20  *
21  * The general constraint of the merge queue is that if a set of items are
22  * inserted into the queue in the same order, then no matter how many threads
23  * are on the scene, we will always process the items in the same order. The
24  * secondary constraint is that to support environments that must be
25  * single-threaded, we explicitly *must not* create a thread in the case where
26  * the number of requested threads is just one.
27  *
28  * To that end, we've designed our queue as a circular buffer. We will grow that
29  * buffer to contain enough space for all the input items, after which we'll
30  * then treat it as a circular buffer.
31  *
32  * Items will be issued to a processing function two at a time, until there is
33  * only one item remaining in the queue, at which point we will be doing any
34  * merging work.
35  *
36  * A given queue has three different entries that we care about tracking:
37  *
38  * o mq_nproc   - What is the slot of the next item to process for something
39  *                looking for work.
40  *
41  * o mq_next    - What is the slot of the next item that should be inserted into
42  *                the queue.
43  *
44  * o mq_ncommit - What is the slot of the next item that should be committed.
45  *
46  * When a thread comes and looks for work, we pop entries off of the queue based
47  * on the index provided by mq_nproc. At the same time, it also gets the slot
48  * that it should place the result in, which is mq_next. However, because we
49  * have multiple threads that are operating on the system, we want to make sure
50  * that we push things onto the queue in order. We do that by allocating a slot
51  * to each task and when it completes, it waits for its slot to be ready based
52  * on it being the value of mq_ncommit.
53  *
54  * In addition, we keep track of the number of items in the queue as well as the
55  * number of active workers. There's also a generation count that is used to
56  * figure out when the various values might lap one another.
57  *
58  * The following images show what happens when we have a queue with six items
59  * and whose capacity has been shrunk to six, to better fit in the screen.
60  *
61  *
62  * 1) This is the initial configuration of the queue right before any processing
63  * is done in the context of mergeq_merge(). Every box has an initial item for
64  * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and
65  * mq_ncommit will all point at the initial entry. However, the mq_next has
66  * already lapped around the array and thus has a generation count of one.
67  *
68  * The '+' characters indicate which bucket the corresponding value of mq_nproc,
69  * mq_ncommit, and mq_nproc.
70  *
71  *                     +---++---++---++---++---++---+
72  *                     | X || X || X || X || X || X |
73  *                     +---++---++---++---++---++---+
74  *        mq_next (g1)   +
75  *     mq_ncommit (g0)   +
76  *       mq_nproc (g0)   +
77  *
78  * 2) This shows the state right as the first thread begins to process an entry.
79  * Note in this example we will have two threads processing this queue. Note,
80  * mq_ncommit has not advanced. This is because the first thread has started
81  * processing entries, but it has not finished, and thus we can't commit it.
82  * We've incremented mq_next by one because it has gone ahead and assigned a
83  * single entry. We've incremented mq_nproc by two, because we have removed two
84  * entries and thus will have another set available.
85  *
86  *                     +---++---++---++---++---++---+     t1 - slot 0
87  *                     |   ||   || X || X || X || X |     t2 - idle
88  *                     +---++---++---++---++---++---+
89  *        mq_next (g1)        +
90  *     mq_ncommit (g0)   +
91  *       mq_nproc (g0)             +
92  *
93  *
94  * 3) This shows the state right after the second thread begins to process an
95  * entry, note that the first thread has not finished. The changes are very
96  * similar to the previous state, we've advanced, mq_nproc and mq_next, but not
97  * mq_ncommit.
98  *
99  *                     +---++---++---++---++---++---+     t1 - slot 0
100  *                     |   ||   ||   ||   || X || X |     t2 - slot 1
101  *                     +---++---++---++---++---++---+
102  *        mq_next (g1)             +
103  *     mq_ncommit (g0)   +
104  *       mq_nproc (g0)                       +
105  *
106  * 4) This shows the state after thread one has finished processing an item, but
107  * before it does anything else. Note that even if thread two finishes early, it
108  * cannot commit its item until thread one finishes. Here 'Y' refers to the
109  * result of merging the first two 'X's.
110  *
111  *                     +---++---++---++---++---++---+     t1 - idle
112  *                     | Y ||   ||   ||   || X || X |     t2 - slot 1
113  *                     +---++---++---++---++---++---+
114  *        mq_next (g1)             +
115  *     mq_ncommit (g0)        +
116  *       mq_nproc (g0)                       +
117  *
118  * 5) This shows the state after thread one has begun to process the next round
119  * and after thread two has committed, but before it begins processing the next
120  * item. Note that mq_nproc has wrapped around and we've bumped its generation
121  * counter.
122  *
123  *                     +---++---++---++---++---++---+     t1 - slot 2
124  *                     | Y || Y ||   ||   ||   ||   |     t2 - idle
125  *                     +---++---++---++---++---++---+
126  *        mq_next (g1)                  +
127  *     mq_ncommit (g0)             +
128  *       mq_nproc (g0)   +
129  *
130  * 6) Here, thread two, will take the next two Y values and thread 1 will commit
131  * its 'Y'. Thread one now must wait until thread two finishes such that it can
132  * do additional work.
133  *
134  *                     +---++---++---++---++---++---+     t1 - waiting
135  *                     |   ||   || Y ||   ||   ||   |     t2 - slot 3
136  *                     +---++---++---++---++---++---+
137  *        mq_next (g1)                       +
138  *     mq_ncommit (g0)                  +
139  *       mq_nproc (g0)             +
140  *
141  * 7) Here, thread two has committed and thread one is about to go process the
142  * final entry. The character 'Z' represents the results of merging two 'Y's.
143  *
144  *                     +---++---++---++---++---++---+     t1 - idle
145  *                     |   ||   || Y || Z ||   ||   |     t2 - idle
146  *                     +---++---++---++---++---++---+
147  *        mq_next (g1)                       +
148  *     mq_ncommit (g0)                       +
149  *       mq_nproc (g0)             +
150  *
151  * 8) Here, thread one is processing the final item. Thread two is waiting in
152  * mergeq_pop() for enough items to be available. In this case, it will never
153  * happen; however, once all threads have finished it will break out.
154  *
155  *                     +---++---++---++---++---++---+     t1 - slot 4
156  *                     |   ||   ||   ||   ||   ||   |     t2 - idle
157  *                     +---++---++---++---++---++---+
158  *        mq_next (g1)                            +
159  *     mq_ncommit (g0)                       +
160  *       mq_nproc (g0)                       +
161  *
162  * 9) This is the final state of the queue, it has a single '*' item which is
163  * the final merge result. At this point, both thread one and thread two would
164  * stop processing and we'll return the result to the user.
165  *
166  *                     +---++---++---++---++---++---+     t1 - slot 4
167  *                     |   ||   ||   ||   || * ||   |     t2 - idle
168  *                     +---++---++---++---++---++---+
169  *        mq_next (g1)                            +
170  *     mq_ncommit (g0)                       +
171  *       mq_nproc (g0)                       +
172  *
173  *
174  * Note, that if at any point in time the processing function fails, then all
175  * the merges will quiesce and that error will be propagated back to the user.
176  */
177 
178 #include <strings.h>
179 #include <sys/debug.h>
180 #include <thread.h>
181 #include <synch.h>
182 #include <errno.h>
183 #include <limits.h>
184 #include <stdlib.h>
185 
186 #include "mergeq.h"
187 
188 struct mergeq {
189 	mutex_t mq_lock;	/* Protects items below */
190 	cond_t	mq_cond;	/* Condition variable */
191 	void **mq_items;	/* Array of items to process */
192 	size_t mq_nitems;	/* Number of items in the queue */
193 	size_t mq_cap;		/* Capacity of the items */
194 	size_t mq_next;		/* Place to put next entry */
195 	size_t mq_gnext;	/* Generation for next */
196 	size_t mq_nproc;	/* Index of next thing to process */
197 	size_t mq_gnproc;	/* Generation for next proc */
198 	size_t mq_ncommit;	/* Index of the next thing to commit */
199 	size_t mq_gncommit;	/* Commit generation */
200 	uint_t mq_nactthrs;	/* Number of active threads */
201 	uint_t mq_ndthreads;	/* Desired number of threads */
202 	thread_t *mq_thrs;	/* Actual threads */
203 	mergeq_proc_f *mq_func;	/* Processing function */
204 	void *mq_arg;		/* Argument for processing */
205 	boolean_t mq_working;	/* Are we working on processing */
206 	boolean_t mq_iserror;	/* Have we encountered an error? */
207 	int mq_error;
208 };
209 
210 #define	MERGEQ_DEFAULT_CAP	64
211 
212 static int
213 mergeq_error(int err)
214 {
215 	errno = err;
216 	return (MERGEQ_ERROR);
217 }
218 
219 void
220 mergeq_fini(mergeq_t *mqp)
221 {
222 	if (mqp == NULL)
223 		return;
224 
225 	VERIFY(mqp->mq_working != B_TRUE);
226 
227 	if (mqp->mq_items != NULL)
228 		mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
229 	if (mqp->mq_ndthreads > 0) {
230 		mergeq_free(mqp->mq_thrs, sizeof (thread_t) *
231 		    mqp->mq_ndthreads);
232 	}
233 	VERIFY0(cond_destroy(&mqp->mq_cond));
234 	VERIFY0(mutex_destroy(&mqp->mq_lock));
235 	mergeq_free(mqp, sizeof (mergeq_t));
236 }
237 
238 int
239 mergeq_init(mergeq_t **outp, uint_t nthrs)
240 {
241 	int ret;
242 	mergeq_t *mqp;
243 
244 	mqp = mergeq_alloc(sizeof (mergeq_t));
245 	if (mqp == NULL)
246 		return (mergeq_error(ENOMEM));
247 
248 	bzero(mqp, sizeof (mergeq_t));
249 	mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP);
250 	if (mqp->mq_items == NULL) {
251 		mergeq_free(mqp, sizeof (mergeq_t));
252 		return (mergeq_error(ENOMEM));
253 	}
254 	bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP);
255 
256 	mqp->mq_ndthreads = nthrs - 1;
257 	if (mqp->mq_ndthreads > 0) {
258 		mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) *
259 		    mqp->mq_ndthreads);
260 		if (mqp->mq_thrs == NULL) {
261 			mergeq_free(mqp->mq_items, sizeof (void *) *
262 			    MERGEQ_DEFAULT_CAP);
263 			mergeq_free(mqp, sizeof (mergeq_t));
264 			return (mergeq_error(ENOMEM));
265 		}
266 	}
267 
268 	if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
269 	    NULL)) != 0) {
270 		if (mqp->mq_ndthreads > 0) {
271 			mergeq_free(mqp->mq_thrs,
272 			    sizeof (thread_t) * mqp->mq_ndthreads);
273 		}
274 		mergeq_free(mqp->mq_items, sizeof (void *) *
275 		    MERGEQ_DEFAULT_CAP);
276 		mergeq_free(mqp, sizeof (mergeq_t));
277 		return (mergeq_error(ret));
278 	}
279 
280 	if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) {
281 		VERIFY0(mutex_destroy(&mqp->mq_lock));
282 		if (mqp->mq_ndthreads > 0) {
283 			mergeq_free(mqp->mq_thrs,
284 			    sizeof (thread_t) * mqp->mq_ndthreads);
285 		}
286 		mergeq_free(mqp->mq_items, sizeof (void *) *
287 		    MERGEQ_DEFAULT_CAP);
288 		mergeq_free(mqp, sizeof (mergeq_t));
289 		return (mergeq_error(ret));
290 	}
291 
292 	mqp->mq_cap = MERGEQ_DEFAULT_CAP;
293 	*outp = mqp;
294 	return (0);
295 }
296 
297 static void
298 mergeq_reset(mergeq_t *mqp)
299 {
300 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
301 	VERIFY(mqp->mq_working == B_FALSE);
302 	if (mqp->mq_cap != 0)
303 		bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap);
304 	mqp->mq_nitems = 0;
305 	mqp->mq_next = 0;
306 	mqp->mq_gnext = 0;
307 	mqp->mq_nproc = 0;
308 	mqp->mq_gnproc = 0;
309 	mqp->mq_ncommit = 0;
310 	mqp->mq_gncommit = 0;
311 	mqp->mq_func = NULL;
312 	mqp->mq_arg = NULL;
313 	mqp->mq_iserror = B_FALSE;
314 	mqp->mq_error = 0;
315 }
316 
317 static int
318 mergeq_grow(mergeq_t *mqp)
319 {
320 	size_t ncap;
321 	void **items;
322 
323 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
324 	VERIFY(mqp->mq_working == B_FALSE);
325 
326 	if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP)
327 		return (ENOSPC);
328 
329 	ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP;
330 	items = mergeq_alloc(ncap * sizeof (void *));
331 	if (items == NULL)
332 		return (ENOMEM);
333 
334 	bzero(items, ncap * sizeof (void *));
335 	bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *));
336 	mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *));
337 	mqp->mq_items = items;
338 	mqp->mq_cap = ncap;
339 	return (0);
340 }
341 
342 int
343 mergeq_add(mergeq_t *mqp, void *item)
344 {
345 	VERIFY0(mutex_lock(&mqp->mq_lock));
346 	if (mqp->mq_working == B_TRUE) {
347 		VERIFY0(mutex_unlock(&mqp->mq_lock));
348 		return (mergeq_error(ENXIO));
349 	}
350 
351 	if (mqp->mq_next == mqp->mq_cap) {
352 		int ret;
353 
354 		if ((ret = mergeq_grow(mqp)) != 0) {
355 			VERIFY0(mutex_unlock(&mqp->mq_lock));
356 			return (mergeq_error(ret));
357 		}
358 	}
359 	mqp->mq_items[mqp->mq_next] = item;
360 	mqp->mq_next++;
361 	mqp->mq_nitems++;
362 
363 	VERIFY0(mutex_unlock(&mqp->mq_lock));
364 	return (0);
365 }
366 
367 static size_t
368 mergeq_slot(mergeq_t *mqp)
369 {
370 	size_t s;
371 
372 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
373 	VERIFY(mqp->mq_next < mqp->mq_cap);
374 
375 	/*
376 	 * This probably should be a cv / wait thing.
377 	 */
378 	VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap);
379 
380 	s = mqp->mq_next;
381 	mqp->mq_next++;
382 	if (mqp->mq_next == mqp->mq_cap) {
383 		mqp->mq_next %= mqp->mq_cap;
384 		mqp->mq_gnext++;
385 	}
386 
387 	return (s);
388 }
389 
390 /*
391  * Internal function to push items onto the queue which is now a circular
392  * buffer. This should only be used once we begin working on the queue.
393  */
394 static void
395 mergeq_push(mergeq_t *mqp, size_t slot, void *item)
396 {
397 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
398 	VERIFY(slot < mqp->mq_cap);
399 
400 	/*
401 	 * We need to verify that we don't push over something that exists.
402 	 * Based on the design, this should never happen. However, in the face
403 	 * of bugs, anything is possible.
404 	 */
405 	while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE)
406 		(void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
407 
408 	if (mqp->mq_iserror == B_TRUE)
409 		return;
410 
411 	mqp->mq_items[slot] = item;
412 	mqp->mq_nitems++;
413 	mqp->mq_ncommit++;
414 	if (mqp->mq_ncommit == mqp->mq_cap) {
415 		mqp->mq_ncommit %= mqp->mq_cap;
416 		mqp->mq_gncommit++;
417 	}
418 	(void) cond_broadcast(&mqp->mq_cond);
419 }
420 
421 static void *
422 mergeq_pop_one(mergeq_t *mqp)
423 {
424 	void *out;
425 
426 	/*
427 	 * We can't move mq_nproc beyond mq_next if they're on the same
428 	 * generation.
429 	 */
430 	VERIFY(mqp->mq_gnext != mqp->mq_gnproc ||
431 	    mqp->mq_nproc != mqp->mq_next);
432 
433 	out = mqp->mq_items[mqp->mq_nproc];
434 
435 	mqp->mq_items[mqp->mq_nproc] = NULL;
436 	mqp->mq_nproc++;
437 	if (mqp->mq_nproc == mqp->mq_cap) {
438 		mqp->mq_nproc %= mqp->mq_cap;
439 		mqp->mq_gnproc++;
440 	}
441 	mqp->mq_nitems--;
442 
443 	return (out);
444 }
445 
446 /*
447  * Pop a set of two entries from the queue. We may not have anything to process
448  * at the moment, eg. be waiting for someone to add something. In which case,
449  * we'll be sitting and waiting.
450  */
451 static boolean_t
452 mergeq_pop(mergeq_t *mqp, void **first, void **second)
453 {
454 	VERIFY(MUTEX_HELD(&mqp->mq_lock));
455 	VERIFY(mqp->mq_nproc < mqp->mq_cap);
456 
457 	while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 &&
458 	    mqp->mq_iserror == B_FALSE)
459 		(void) cond_wait(&mqp->mq_cond, &mqp->mq_lock);
460 
461 	if (mqp->mq_iserror == B_TRUE)
462 		return (B_FALSE);
463 
464 	if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) {
465 		VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1);
466 		return (B_FALSE);
467 	}
468 	VERIFY(mqp->mq_nitems >= 2);
469 
470 	*first = mergeq_pop_one(mqp);
471 	*second = mergeq_pop_one(mqp);
472 
473 	return (B_TRUE);
474 }
475 
476 static void *
477 mergeq_thr_merge(void *arg)
478 {
479 	mergeq_t *mqp = arg;
480 
481 	VERIFY0(mutex_lock(&mqp->mq_lock));
482 
483 	/*
484 	 * Check to make sure creation worked and if not, fail fast.
485 	 */
486 	if (mqp->mq_iserror == B_TRUE) {
487 		VERIFY0(mutex_unlock(&mqp->mq_lock));
488 		return (NULL);
489 	}
490 
491 	for (;;) {
492 		void *first, *second, *out;
493 		int ret;
494 		size_t slot;
495 
496 		if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) {
497 			VERIFY0(mutex_unlock(&mqp->mq_lock));
498 			return (NULL);
499 		}
500 
501 		if (mergeq_pop(mqp, &first, &second) == B_FALSE) {
502 			VERIFY0(mutex_unlock(&mqp->mq_lock));
503 			return (NULL);
504 		}
505 		slot = mergeq_slot(mqp);
506 
507 		mqp->mq_nactthrs++;
508 
509 		VERIFY0(mutex_unlock(&mqp->mq_lock));
510 		ret = mqp->mq_func(first, second, &out, mqp->mq_arg);
511 		VERIFY0(mutex_lock(&mqp->mq_lock));
512 
513 		if (ret != 0) {
514 			if (mqp->mq_iserror == B_FALSE) {
515 				mqp->mq_iserror = B_TRUE;
516 				mqp->mq_error = ret;
517 				(void) cond_broadcast(&mqp->mq_cond);
518 			}
519 			mqp->mq_nactthrs--;
520 			VERIFY0(mutex_unlock(&mqp->mq_lock));
521 			return (NULL);
522 		}
523 		mergeq_push(mqp, slot, out);
524 		mqp->mq_nactthrs--;
525 	}
526 }
527 
528 int
529 mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp,
530     int *errp)
531 {
532 	int ret, i;
533 	boolean_t seterr = B_FALSE;
534 
535 	if (mqp == NULL || func == NULL || outp == NULL) {
536 		return (mergeq_error(EINVAL));
537 	}
538 
539 	VERIFY0(mutex_lock(&mqp->mq_lock));
540 	if (mqp->mq_working == B_TRUE) {
541 		VERIFY0(mutex_unlock(&mqp->mq_lock));
542 		return (mergeq_error(EBUSY));
543 	}
544 
545 	if (mqp->mq_nitems == 0) {
546 		*outp = NULL;
547 		mergeq_reset(mqp);
548 		VERIFY0(mutex_unlock(&mqp->mq_lock));
549 		return (0);
550 	}
551 
552 	/*
553 	 * Now that we've finished adding items to the queue, turn it into a
554 	 * circular buffer.
555 	 */
556 	mqp->mq_func = func;
557 	mqp->mq_arg = arg;
558 	mqp->mq_nproc = 0;
559 	mqp->mq_working = B_TRUE;
560 	if (mqp->mq_next == mqp->mq_cap) {
561 		mqp->mq_next %= mqp->mq_cap;
562 		mqp->mq_gnext++;
563 	}
564 	mqp->mq_ncommit = mqp->mq_next;
565 
566 	ret = 0;
567 	for (i = 0; i < mqp->mq_ndthreads; i++) {
568 		ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0,
569 		    &mqp->mq_thrs[i]);
570 		if (ret != 0) {
571 			mqp->mq_iserror = B_TRUE;
572 			break;
573 		}
574 	}
575 
576 	VERIFY0(mutex_unlock(&mqp->mq_lock));
577 	if (ret == 0)
578 		(void) mergeq_thr_merge(mqp);
579 
580 	for (i = 0; i < mqp->mq_ndthreads; i++) {
581 		VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL));
582 	}
583 
584 	VERIFY0(mutex_lock(&mqp->mq_lock));
585 
586 	VERIFY(mqp->mq_nactthrs == 0);
587 	mqp->mq_working = B_FALSE;
588 	if (ret == 0 && mqp->mq_iserror == B_FALSE) {
589 		VERIFY(mqp->mq_nitems == 1);
590 		*outp = mergeq_pop_one(mqp);
591 	} else if (ret == 0 && mqp->mq_iserror == B_TRUE) {
592 		ret = MERGEQ_UERROR;
593 		if (errp != NULL)
594 			*errp = mqp->mq_error;
595 	} else {
596 		seterr = B_TRUE;
597 	}
598 
599 	mergeq_reset(mqp);
600 	VERIFY0(mutex_unlock(&mqp->mq_lock));
601 
602 	if (seterr == B_TRUE)
603 		return (mergeq_error(ret));
604 
605 	return (ret);
606 }
607