Lines Matching refs:wq
276 finalize_phase_one(workqueue_t *wq) in finalize_phase_one() argument
292 for (startslot = -1, i = 0; i < wq->wq_nwipslots; i++) { in finalize_phase_one()
293 if (wq->wq_wip[i].wip_batchid == wq->wq_lastdonebatch + 1) { in finalize_phase_one()
301 for (i = startslot; i < startslot + wq->wq_nwipslots; i++) { in finalize_phase_one()
302 int slotnum = i % wq->wq_nwipslots; in finalize_phase_one()
303 wip_t *wipslot = &wq->wq_wip[slotnum]; in finalize_phase_one()
312 fifo_add(wq->wq_donequeue, wipslot->wip_td); in finalize_phase_one()
313 wq->wq_wip[slotnum].wip_td = NULL; in finalize_phase_one()
317 wq->wq_lastdonebatch = wq->wq_next_batchid++; in finalize_phase_one()
320 fifo_len(wq->wq_donequeue)); in finalize_phase_one()
324 init_phase_two(workqueue_t *wq) in init_phase_two() argument
336 wq->wq_ninqueue = num = fifo_len(wq->wq_donequeue); in init_phase_two()
338 wq->wq_ninqueue += num / 2; in init_phase_two()
346 assert(fifo_len(wq->wq_queue) == 0); in init_phase_two()
347 fifo_free(wq->wq_queue, NULL); in init_phase_two()
348 wq->wq_queue = wq->wq_donequeue; in init_phase_two()
352 wip_save_work(workqueue_t *wq, wip_t *slot, int slotnum) in wip_save_work() argument
354 pthread_mutex_lock(&wq->wq_donequeue_lock); in wip_save_work()
356 while (wq->wq_lastdonebatch + 1 < slot->wip_batchid) in wip_save_work()
357 pthread_cond_wait(&slot->wip_cv, &wq->wq_donequeue_lock); in wip_save_work()
358 assert(wq->wq_lastdonebatch + 1 == slot->wip_batchid); in wip_save_work()
360 fifo_add(wq->wq_donequeue, slot->wip_td); in wip_save_work()
361 wq->wq_lastdonebatch++; in wip_save_work()
362 pthread_cond_signal(&wq->wq_wip[(slotnum + 1) % in wip_save_work()
363 wq->wq_nwipslots].wip_cv); in wip_save_work()
367 slot->wip_batchid = wq->wq_next_batchid++; in wip_save_work()
369 pthread_mutex_unlock(&wq->wq_donequeue_lock); in wip_save_work()
390 worker_runphase1(workqueue_t *wq) in worker_runphase1() argument
397 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase1()
399 while (fifo_empty(wq->wq_queue)) { in worker_runphase1()
400 if (wq->wq_nomorefiles == 1) { in worker_runphase1()
401 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase1()
402 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
408 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase1()
409 &wq->wq_queue_lock); in worker_runphase1()
413 pow = fifo_remove(wq->wq_queue); in worker_runphase1()
414 pownum = wq->wq_nextpownum++; in worker_runphase1()
415 pthread_cond_broadcast(&wq->wq_work_removed); in worker_runphase1()
420 wipslotnum = pownum % wq->wq_nwipslots; in worker_runphase1()
421 wipslot = &wq->wq_wip[wipslotnum]; in worker_runphase1()
425 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
429 if (wipslot->wip_nmerged == wq->wq_maxbatchsz) in worker_runphase1()
430 wip_save_work(wq, wipslot, wipslotnum); in worker_runphase1()
437 worker_runphase2(workqueue_t *wq) in worker_runphase2() argument
443 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
445 if (wq->wq_ninqueue == 1) { in worker_runphase2()
446 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase2()
447 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
451 if (barrier_wait(&wq->wq_bar1)) { in worker_runphase2()
452 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
453 wq->wq_alldone = 1; in worker_runphase2()
454 pthread_cond_signal(&wq->wq_alldone_cv); in worker_runphase2()
455 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
461 if (fifo_len(wq->wq_queue) < 2) { in worker_runphase2()
462 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase2()
463 &wq->wq_queue_lock); in worker_runphase2()
464 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
469 pow1 = fifo_remove(wq->wq_queue); in worker_runphase2()
470 pow2 = fifo_remove(wq->wq_queue); in worker_runphase2()
471 wq->wq_ninqueue -= 2; in worker_runphase2()
473 batchid = wq->wq_next_batchid++; in worker_runphase2()
475 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
486 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
487 while (wq->wq_lastdonebatch + 1 != batchid) { in worker_runphase2()
488 pthread_cond_wait(&wq->wq_done_cv, in worker_runphase2()
489 &wq->wq_queue_lock); in worker_runphase2()
492 wq->wq_lastdonebatch = batchid; in worker_runphase2()
494 fifo_add(wq->wq_queue, pow2); in worker_runphase2()
496 pthread_self(), (void *)pow2, fifo_len(wq->wq_queue), in worker_runphase2()
497 wq->wq_ninqueue); in worker_runphase2()
498 pthread_cond_broadcast(&wq->wq_done_cv); in worker_runphase2()
499 pthread_cond_signal(&wq->wq_work_avail); in worker_runphase2()
500 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
508 worker_thread(workqueue_t *wq) in worker_thread() argument
510 worker_runphase1(wq); in worker_thread()
514 if (barrier_wait(&wq->wq_bar1)) { in worker_thread()
518 finalize_phase_one(wq); in worker_thread()
520 init_phase_two(wq); in worker_thread()
523 wq->wq_ninqueue, fifo_len(wq->wq_queue)); in worker_thread()
528 (void) barrier_wait(&wq->wq_bar2); in worker_thread()
532 worker_runphase2(wq); in worker_thread()
542 workqueue_t *wq = arg; in merge_ctf_cb() local
546 pthread_mutex_lock(&wq->wq_queue_lock); in merge_ctf_cb()
547 while (fifo_len(wq->wq_queue) > wq->wq_ithrottle) { in merge_ctf_cb()
549 fifo_len(wq->wq_queue), wq->wq_ithrottle); in merge_ctf_cb()
550 pthread_cond_wait(&wq->wq_work_removed, &wq->wq_queue_lock); in merge_ctf_cb()
553 fifo_add(wq->wq_queue, td); in merge_ctf_cb()
555 pthread_cond_broadcast(&wq->wq_work_avail); in merge_ctf_cb()
556 pthread_mutex_unlock(&wq->wq_queue_lock); in merge_ctf_cb()
621 wq_init(workqueue_t *wq, int nfiles) in wq_init() argument
631 wq->wq_maxbatchsz = atoi(getenv("CTFMERGE_PHASE1_BATCH_SIZE")); in wq_init()
633 wq->wq_maxbatchsz = MERGE_PHASE1_BATCH_SIZE; in wq_init()
635 nslots = MIN(nslots, (nfiles + wq->wq_maxbatchsz - 1) / in wq_init()
636 wq->wq_maxbatchsz); in wq_init()
638 wq->wq_wip = xcalloc(sizeof (wip_t) * nslots); in wq_init()
639 wq->wq_nwipslots = nslots; in wq_init()
640 wq->wq_nthreads = MIN(sysconf(_SC_NPROCESSORS_ONLN) * 3 / 2, nslots); in wq_init()
641 wq->wq_thread = xmalloc(sizeof (pthread_t) * wq->wq_nthreads); in wq_init()
647 wq->wq_ithrottle = throttle * wq->wq_nthreads; in wq_init()
649 debug(1, "Using %d slots, %d threads\n", wq->wq_nwipslots, in wq_init()
650 wq->wq_nthreads); in wq_init()
652 wq->wq_next_batchid = 0; in wq_init()
655 pthread_mutex_init(&wq->wq_wip[i].wip_lock, NULL); in wq_init()
656 wq->wq_wip[i].wip_batchid = wq->wq_next_batchid++; in wq_init()
659 pthread_mutex_init(&wq->wq_queue_lock, NULL); in wq_init()
660 wq->wq_queue = fifo_new(); in wq_init()
661 pthread_cond_init(&wq->wq_work_avail, NULL); in wq_init()
662 pthread_cond_init(&wq->wq_work_removed, NULL); in wq_init()
663 wq->wq_ninqueue = nfiles; in wq_init()
664 wq->wq_nextpownum = 0; in wq_init()
666 pthread_mutex_init(&wq->wq_donequeue_lock, NULL); in wq_init()
667 wq->wq_donequeue = fifo_new(); in wq_init()
668 wq->wq_lastdonebatch = -1; in wq_init()
670 pthread_cond_init(&wq->wq_done_cv, NULL); in wq_init()
672 pthread_cond_init(&wq->wq_alldone_cv, NULL); in wq_init()
673 wq->wq_alldone = 0; in wq_init()
675 barrier_init(&wq->wq_bar1, wq->wq_nthreads); in wq_init()
676 barrier_init(&wq->wq_bar2, wq->wq_nthreads); in wq_init()
678 wq->wq_nomorefiles = 0; in wq_init()
682 start_threads(workqueue_t *wq) in start_threads() argument
693 for (i = 0; i < wq->wq_nthreads; i++) { in start_threads()
694 pthread_create(&wq->wq_thread[i], NULL, in start_threads()
695 (void *(*)(void *))worker_thread, wq); in start_threads()
705 join_threads(workqueue_t *wq) in join_threads() argument
709 for (i = 0; i < wq->wq_nthreads; i++) { in join_threads()
710 pthread_join(wq->wq_thread[i], NULL); in join_threads()
730 static workqueue_t wq; variable
879 wq_init(&wq, nielems); in main()
881 start_threads(&wq); in main()
890 &wq, require_ctf) == 0) { in main()
901 pthread_mutex_lock(&wq.wq_queue_lock); in main()
902 wq.wq_nomorefiles = 1; in main()
903 pthread_cond_broadcast(&wq.wq_work_avail); in main()
904 pthread_mutex_unlock(&wq.wq_queue_lock); in main()
906 pthread_mutex_lock(&wq.wq_queue_lock); in main()
907 while (wq.wq_alldone == 0) in main()
908 pthread_cond_wait(&wq.wq_alldone_cv, &wq.wq_queue_lock); in main()
909 pthread_mutex_unlock(&wq.wq_queue_lock); in main()
911 join_threads(&wq); in main()
924 assert(fifo_len(wq.wq_queue) == 1); in main()
925 mstrtd = fifo_remove(wq.wq_queue); in main()