Lines Matching defs:wq
276 finalize_phase_one(workqueue_t *wq)
292 for (startslot = -1, i = 0; i < wq->wq_nwipslots; i++) {
293 if (wq->wq_wip[i].wip_batchid == wq->wq_lastdonebatch + 1) {
301 for (i = startslot; i < startslot + wq->wq_nwipslots; i++) {
302 int slotnum = i % wq->wq_nwipslots;
303 wip_t *wipslot = &wq->wq_wip[slotnum];
312 fifo_add(wq->wq_donequeue, wipslot->wip_td);
313 wq->wq_wip[slotnum].wip_td = NULL;
317 wq->wq_lastdonebatch = wq->wq_next_batchid++;
320 fifo_len(wq->wq_donequeue));
324 init_phase_two(workqueue_t *wq)
336 wq->wq_ninqueue = num = fifo_len(wq->wq_donequeue);
338 wq->wq_ninqueue += num / 2;
346 assert(fifo_len(wq->wq_queue) == 0);
347 fifo_free(wq->wq_queue, NULL);
348 wq->wq_queue = wq->wq_donequeue;
352 wip_save_work(workqueue_t *wq, wip_t *slot, int slotnum)
354 pthread_mutex_lock(&wq->wq_donequeue_lock);
356 while (wq->wq_lastdonebatch + 1 < slot->wip_batchid)
357 pthread_cond_wait(&slot->wip_cv, &wq->wq_donequeue_lock);
358 assert(wq->wq_lastdonebatch + 1 == slot->wip_batchid);
360 fifo_add(wq->wq_donequeue, slot->wip_td);
361 wq->wq_lastdonebatch++;
362 pthread_cond_signal(&wq->wq_wip[(slotnum + 1) %
363 wq->wq_nwipslots].wip_cv);
367 slot->wip_batchid = wq->wq_next_batchid++;
369 pthread_mutex_unlock(&wq->wq_donequeue_lock);
390 worker_runphase1(workqueue_t *wq)
397 pthread_mutex_lock(&wq->wq_queue_lock);
399 while (fifo_empty(wq->wq_queue)) {
400 if (wq->wq_nomorefiles == 1) {
401 pthread_cond_broadcast(&wq->wq_work_avail);
402 pthread_mutex_unlock(&wq->wq_queue_lock);
408 pthread_cond_wait(&wq->wq_work_avail,
409 &wq->wq_queue_lock);
413 pow = fifo_remove(wq->wq_queue);
414 pownum = wq->wq_nextpownum++;
415 pthread_cond_broadcast(&wq->wq_work_removed);
420 wipslotnum = pownum % wq->wq_nwipslots;
421 wipslot = &wq->wq_wip[wipslotnum];
425 pthread_mutex_unlock(&wq->wq_queue_lock);
429 if (wipslot->wip_nmerged == wq->wq_maxbatchsz)
430 wip_save_work(wq, wipslot, wipslotnum);
437 worker_runphase2(workqueue_t *wq)
443 pthread_mutex_lock(&wq->wq_queue_lock);
445 if (wq->wq_ninqueue == 1) {
446 pthread_cond_broadcast(&wq->wq_work_avail);
447 pthread_mutex_unlock(&wq->wq_queue_lock);
451 if (barrier_wait(&wq->wq_bar1)) {
452 pthread_mutex_lock(&wq->wq_queue_lock);
453 wq->wq_alldone = 1;
454 pthread_cond_signal(&wq->wq_alldone_cv);
455 pthread_mutex_unlock(&wq->wq_queue_lock);
461 if (fifo_len(wq->wq_queue) < 2) {
462 pthread_cond_wait(&wq->wq_work_avail,
463 &wq->wq_queue_lock);
464 pthread_mutex_unlock(&wq->wq_queue_lock);
469 pow1 = fifo_remove(wq->wq_queue);
470 pow2 = fifo_remove(wq->wq_queue);
471 wq->wq_ninqueue -= 2;
473 batchid = wq->wq_next_batchid++;
475 pthread_mutex_unlock(&wq->wq_queue_lock);
486 pthread_mutex_lock(&wq->wq_queue_lock);
487 while (wq->wq_lastdonebatch + 1 != batchid) {
488 pthread_cond_wait(&wq->wq_done_cv,
489 &wq->wq_queue_lock);
492 wq->wq_lastdonebatch = batchid;
494 fifo_add(wq->wq_queue, pow2);
496 pthread_self(), (void *)pow2, fifo_len(wq->wq_queue),
497 wq->wq_ninqueue);
498 pthread_cond_broadcast(&wq->wq_done_cv);
499 pthread_cond_signal(&wq->wq_work_avail);
500 pthread_mutex_unlock(&wq->wq_queue_lock);
508 worker_thread(workqueue_t *wq)
510 worker_runphase1(wq);
514 if (barrier_wait(&wq->wq_bar1)) {
518 finalize_phase_one(wq);
520 init_phase_two(wq);
523 wq->wq_ninqueue, fifo_len(wq->wq_queue));
528 (void) barrier_wait(&wq->wq_bar2);
532 worker_runphase2(wq);
542 workqueue_t *wq = arg;
546 pthread_mutex_lock(&wq->wq_queue_lock);
547 while (fifo_len(wq->wq_queue) > wq->wq_ithrottle) {
549 fifo_len(wq->wq_queue), wq->wq_ithrottle);
550 pthread_cond_wait(&wq->wq_work_removed, &wq->wq_queue_lock);
553 fifo_add(wq->wq_queue, td);
555 pthread_cond_broadcast(&wq->wq_work_avail);
556 pthread_mutex_unlock(&wq->wq_queue_lock);
621 wq_init(workqueue_t *wq, int nfiles)
631 wq->wq_maxbatchsz = atoi(getenv("CTFMERGE_PHASE1_BATCH_SIZE"));
633 wq->wq_maxbatchsz = MERGE_PHASE1_BATCH_SIZE;
635 nslots = MIN(nslots, (nfiles + wq->wq_maxbatchsz - 1) /
636 wq->wq_maxbatchsz);
638 wq->wq_wip = xcalloc(sizeof (wip_t) * nslots);
639 wq->wq_nwipslots = nslots;
640 wq->wq_nthreads = MIN(sysconf(_SC_NPROCESSORS_ONLN) * 3 / 2, nslots);
641 wq->wq_thread = xmalloc(sizeof (pthread_t) * wq->wq_nthreads);
647 wq->wq_ithrottle = throttle * wq->wq_nthreads;
649 debug(1, "Using %d slots, %d threads\n", wq->wq_nwipslots,
650 wq->wq_nthreads);
652 wq->wq_next_batchid = 0;
655 pthread_mutex_init(&wq->wq_wip[i].wip_lock, NULL);
656 wq->wq_wip[i].wip_batchid = wq->wq_next_batchid++;
659 pthread_mutex_init(&wq->wq_queue_lock, NULL);
660 wq->wq_queue = fifo_new();
661 pthread_cond_init(&wq->wq_work_avail, NULL);
662 pthread_cond_init(&wq->wq_work_removed, NULL);
663 wq->wq_ninqueue = nfiles;
664 wq->wq_nextpownum = 0;
666 pthread_mutex_init(&wq->wq_donequeue_lock, NULL);
667 wq->wq_donequeue = fifo_new();
668 wq->wq_lastdonebatch = -1;
670 pthread_cond_init(&wq->wq_done_cv, NULL);
672 pthread_cond_init(&wq->wq_alldone_cv, NULL);
673 wq->wq_alldone = 0;
675 barrier_init(&wq->wq_bar1, wq->wq_nthreads);
676 barrier_init(&wq->wq_bar2, wq->wq_nthreads);
678 wq->wq_nomorefiles = 0;
682 start_threads(workqueue_t *wq)
693 for (i = 0; i < wq->wq_nthreads; i++) {
694 pthread_create(&wq->wq_thread[i], NULL,
695 (void *(*)(void *))worker_thread, wq);
705 join_threads(workqueue_t *wq)
709 for (i = 0; i < wq->wq_nthreads; i++) {
710 pthread_join(wq->wq_thread[i], NULL);
730 static workqueue_t wq;
879 wq_init(&wq, nielems);
881 start_threads(&wq);
890 &wq, require_ctf) == 0) {
901 pthread_mutex_lock(&wq.wq_queue_lock);
902 wq.wq_nomorefiles = 1;
903 pthread_cond_broadcast(&wq.wq_work_avail);
904 pthread_mutex_unlock(&wq.wq_queue_lock);
906 pthread_mutex_lock(&wq.wq_queue_lock);
907 while (wq.wq_alldone == 0)
908 pthread_cond_wait(&wq.wq_alldone_cv, &wq.wq_queue_lock);
909 pthread_mutex_unlock(&wq.wq_queue_lock);
911 join_threads(&wq);
924 assert(fifo_len(wq.wq_queue) == 1);
925 mstrtd = fifo_remove(wq.wq_queue);