Lines Matching refs:mqp
220 mergeq_fini(mergeq_t *mqp) in mergeq_fini() argument
222 if (mqp == NULL) in mergeq_fini()
225 VERIFY(mqp->mq_working != B_TRUE); in mergeq_fini()
227 if (mqp->mq_items != NULL) in mergeq_fini()
228 mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap); in mergeq_fini()
229 if (mqp->mq_ndthreads > 0) { in mergeq_fini()
230 mergeq_free(mqp->mq_thrs, sizeof (thread_t) * in mergeq_fini()
231 mqp->mq_ndthreads); in mergeq_fini()
233 VERIFY0(cond_destroy(&mqp->mq_cond)); in mergeq_fini()
234 VERIFY0(mutex_destroy(&mqp->mq_lock)); in mergeq_fini()
235 mergeq_free(mqp, sizeof (mergeq_t)); in mergeq_fini()
242 mergeq_t *mqp; in mergeq_init() local
244 mqp = mergeq_alloc(sizeof (mergeq_t)); in mergeq_init()
245 if (mqp == NULL) in mergeq_init()
248 bzero(mqp, sizeof (mergeq_t)); in mergeq_init()
249 mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP); in mergeq_init()
250 if (mqp->mq_items == NULL) { in mergeq_init()
251 mergeq_free(mqp, sizeof (mergeq_t)); in mergeq_init()
254 bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP); in mergeq_init()
256 mqp->mq_ndthreads = nthrs - 1; in mergeq_init()
257 if (mqp->mq_ndthreads > 0) { in mergeq_init()
258 mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) * in mergeq_init()
259 mqp->mq_ndthreads); in mergeq_init()
260 if (mqp->mq_thrs == NULL) { in mergeq_init()
261 mergeq_free(mqp->mq_items, sizeof (void *) * in mergeq_init()
263 mergeq_free(mqp, sizeof (mergeq_t)); in mergeq_init()
268 if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK, in mergeq_init()
270 if (mqp->mq_ndthreads > 0) { in mergeq_init()
271 mergeq_free(mqp->mq_thrs, in mergeq_init()
272 sizeof (thread_t) * mqp->mq_ndthreads); in mergeq_init()
274 mergeq_free(mqp->mq_items, sizeof (void *) * in mergeq_init()
276 mergeq_free(mqp, sizeof (mergeq_t)); in mergeq_init()
280 if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) { in mergeq_init()
281 VERIFY0(mutex_destroy(&mqp->mq_lock)); in mergeq_init()
282 if (mqp->mq_ndthreads > 0) { in mergeq_init()
283 mergeq_free(mqp->mq_thrs, in mergeq_init()
284 sizeof (thread_t) * mqp->mq_ndthreads); in mergeq_init()
286 mergeq_free(mqp->mq_items, sizeof (void *) * in mergeq_init()
288 mergeq_free(mqp, sizeof (mergeq_t)); in mergeq_init()
292 mqp->mq_cap = MERGEQ_DEFAULT_CAP; in mergeq_init()
293 *outp = mqp; in mergeq_init()
298 mergeq_reset(mergeq_t *mqp) in mergeq_reset() argument
300 VERIFY(MUTEX_HELD(&mqp->mq_lock)); in mergeq_reset()
301 VERIFY(mqp->mq_working == B_FALSE); in mergeq_reset()
302 if (mqp->mq_cap != 0) in mergeq_reset()
303 bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap); in mergeq_reset()
304 mqp->mq_nitems = 0; in mergeq_reset()
305 mqp->mq_next = 0; in mergeq_reset()
306 mqp->mq_gnext = 0; in mergeq_reset()
307 mqp->mq_nproc = 0; in mergeq_reset()
308 mqp->mq_gnproc = 0; in mergeq_reset()
309 mqp->mq_ncommit = 0; in mergeq_reset()
310 mqp->mq_gncommit = 0; in mergeq_reset()
311 mqp->mq_func = NULL; in mergeq_reset()
312 mqp->mq_arg = NULL; in mergeq_reset()
313 mqp->mq_iserror = B_FALSE; in mergeq_reset()
314 mqp->mq_error = 0; in mergeq_reset()
318 mergeq_grow(mergeq_t *mqp) in mergeq_grow() argument
323 VERIFY(MUTEX_HELD(&mqp->mq_lock)); in mergeq_grow()
324 VERIFY(mqp->mq_working == B_FALSE); in mergeq_grow()
326 if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP) in mergeq_grow()
329 ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP; in mergeq_grow()
335 bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *)); in mergeq_grow()
336 mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *)); in mergeq_grow()
337 mqp->mq_items = items; in mergeq_grow()
338 mqp->mq_cap = ncap; in mergeq_grow()
343 mergeq_add(mergeq_t *mqp, void *item) in mergeq_add() argument
345 VERIFY0(mutex_lock(&mqp->mq_lock)); in mergeq_add()
346 if (mqp->mq_working == B_TRUE) { in mergeq_add()
347 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_add()
351 if (mqp->mq_next == mqp->mq_cap) { in mergeq_add()
354 if ((ret = mergeq_grow(mqp)) != 0) { in mergeq_add()
355 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_add()
359 mqp->mq_items[mqp->mq_next] = item; in mergeq_add()
360 mqp->mq_next++; in mergeq_add()
361 mqp->mq_nitems++; in mergeq_add()
363 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_add()
368 mergeq_slot(mergeq_t *mqp) in mergeq_slot() argument
372 VERIFY(MUTEX_HELD(&mqp->mq_lock)); in mergeq_slot()
373 VERIFY(mqp->mq_next < mqp->mq_cap); in mergeq_slot()
378 VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap); in mergeq_slot()
380 s = mqp->mq_next; in mergeq_slot()
381 mqp->mq_next++; in mergeq_slot()
382 if (mqp->mq_next == mqp->mq_cap) { in mergeq_slot()
383 mqp->mq_next %= mqp->mq_cap; in mergeq_slot()
384 mqp->mq_gnext++; in mergeq_slot()
395 mergeq_push(mergeq_t *mqp, size_t slot, void *item) in mergeq_push() argument
397 VERIFY(MUTEX_HELD(&mqp->mq_lock)); in mergeq_push()
398 VERIFY(slot < mqp->mq_cap); in mergeq_push()
405 while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE) in mergeq_push()
406 (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); in mergeq_push()
408 if (mqp->mq_iserror == B_TRUE) in mergeq_push()
411 mqp->mq_items[slot] = item; in mergeq_push()
412 mqp->mq_nitems++; in mergeq_push()
413 mqp->mq_ncommit++; in mergeq_push()
414 if (mqp->mq_ncommit == mqp->mq_cap) { in mergeq_push()
415 mqp->mq_ncommit %= mqp->mq_cap; in mergeq_push()
416 mqp->mq_gncommit++; in mergeq_push()
418 (void) cond_broadcast(&mqp->mq_cond); in mergeq_push()
422 mergeq_pop_one(mergeq_t *mqp) in mergeq_pop_one() argument
430 VERIFY(mqp->mq_gnext != mqp->mq_gnproc || in mergeq_pop_one()
431 mqp->mq_nproc != mqp->mq_next); in mergeq_pop_one()
433 out = mqp->mq_items[mqp->mq_nproc]; in mergeq_pop_one()
435 mqp->mq_items[mqp->mq_nproc] = NULL; in mergeq_pop_one()
436 mqp->mq_nproc++; in mergeq_pop_one()
437 if (mqp->mq_nproc == mqp->mq_cap) { in mergeq_pop_one()
438 mqp->mq_nproc %= mqp->mq_cap; in mergeq_pop_one()
439 mqp->mq_gnproc++; in mergeq_pop_one()
441 mqp->mq_nitems--; in mergeq_pop_one()
452 mergeq_pop(mergeq_t *mqp, void **first, void **second) in mergeq_pop() argument
454 VERIFY(MUTEX_HELD(&mqp->mq_lock)); in mergeq_pop()
455 VERIFY(mqp->mq_nproc < mqp->mq_cap); in mergeq_pop()
457 while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 && in mergeq_pop()
458 mqp->mq_iserror == B_FALSE) in mergeq_pop()
459 (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); in mergeq_pop()
461 if (mqp->mq_iserror == B_TRUE) in mergeq_pop()
464 if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) { in mergeq_pop()
465 VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1); in mergeq_pop()
468 VERIFY(mqp->mq_nitems >= 2); in mergeq_pop()
470 *first = mergeq_pop_one(mqp); in mergeq_pop()
471 *second = mergeq_pop_one(mqp); in mergeq_pop()
479 mergeq_t *mqp = arg; in mergeq_thr_merge() local
481 VERIFY0(mutex_lock(&mqp->mq_lock)); in mergeq_thr_merge()
486 if (mqp->mq_iserror == B_TRUE) { in mergeq_thr_merge()
487 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_thr_merge()
496 if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) { in mergeq_thr_merge()
497 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_thr_merge()
501 if (mergeq_pop(mqp, &first, &second) == B_FALSE) { in mergeq_thr_merge()
502 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_thr_merge()
505 slot = mergeq_slot(mqp); in mergeq_thr_merge()
507 mqp->mq_nactthrs++; in mergeq_thr_merge()
509 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_thr_merge()
510 ret = mqp->mq_func(first, second, &out, mqp->mq_arg); in mergeq_thr_merge()
511 VERIFY0(mutex_lock(&mqp->mq_lock)); in mergeq_thr_merge()
514 if (mqp->mq_iserror == B_FALSE) { in mergeq_thr_merge()
515 mqp->mq_iserror = B_TRUE; in mergeq_thr_merge()
516 mqp->mq_error = ret; in mergeq_thr_merge()
517 (void) cond_broadcast(&mqp->mq_cond); in mergeq_thr_merge()
519 mqp->mq_nactthrs--; in mergeq_thr_merge()
520 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_thr_merge()
523 mergeq_push(mqp, slot, out); in mergeq_thr_merge()
524 mqp->mq_nactthrs--; in mergeq_thr_merge()
529 mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp, in mergeq_merge() argument
535 if (mqp == NULL || func == NULL || outp == NULL) { in mergeq_merge()
539 VERIFY0(mutex_lock(&mqp->mq_lock)); in mergeq_merge()
540 if (mqp->mq_working == B_TRUE) { in mergeq_merge()
541 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_merge()
545 if (mqp->mq_nitems == 0) { in mergeq_merge()
547 mergeq_reset(mqp); in mergeq_merge()
548 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_merge()
556 mqp->mq_func = func; in mergeq_merge()
557 mqp->mq_arg = arg; in mergeq_merge()
558 mqp->mq_nproc = 0; in mergeq_merge()
559 mqp->mq_working = B_TRUE; in mergeq_merge()
560 if (mqp->mq_next == mqp->mq_cap) { in mergeq_merge()
561 mqp->mq_next %= mqp->mq_cap; in mergeq_merge()
562 mqp->mq_gnext++; in mergeq_merge()
564 mqp->mq_ncommit = mqp->mq_next; in mergeq_merge()
567 for (i = 0; i < mqp->mq_ndthreads; i++) { in mergeq_merge()
568 ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0, in mergeq_merge()
569 &mqp->mq_thrs[i]); in mergeq_merge()
571 mqp->mq_iserror = B_TRUE; in mergeq_merge()
576 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_merge()
578 (void) mergeq_thr_merge(mqp); in mergeq_merge()
580 for (i = 0; i < mqp->mq_ndthreads; i++) { in mergeq_merge()
581 VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL)); in mergeq_merge()
584 VERIFY0(mutex_lock(&mqp->mq_lock)); in mergeq_merge()
586 VERIFY(mqp->mq_nactthrs == 0); in mergeq_merge()
587 mqp->mq_working = B_FALSE; in mergeq_merge()
588 if (ret == 0 && mqp->mq_iserror == B_FALSE) { in mergeq_merge()
589 VERIFY(mqp->mq_nitems == 1); in mergeq_merge()
590 *outp = mergeq_pop_one(mqp); in mergeq_merge()
591 } else if (ret == 0 && mqp->mq_iserror == B_TRUE) { in mergeq_merge()
594 *errp = mqp->mq_error; in mergeq_merge()
599 mergeq_reset(mqp); in mergeq_merge()
600 VERIFY0(mutex_unlock(&mqp->mq_lock)); in mergeq_merge()