Lines Matching refs:wq
286 finalize_phase_one(workqueue_t *wq) in finalize_phase_one() argument
302 for (startslot = -1, i = 0; i < wq->wq_nwipslots; i++) { in finalize_phase_one()
303 if (wq->wq_wip[i].wip_batchid == wq->wq_lastdonebatch + 1) { in finalize_phase_one()
311 for (i = startslot; i < startslot + wq->wq_nwipslots; i++) { in finalize_phase_one()
312 int slotnum = i % wq->wq_nwipslots; in finalize_phase_one()
313 wip_t *wipslot = &wq->wq_wip[slotnum]; in finalize_phase_one()
322 fifo_add(wq->wq_donequeue, wipslot->wip_td); in finalize_phase_one()
323 wq->wq_wip[slotnum].wip_td = NULL; in finalize_phase_one()
327 wq->wq_lastdonebatch = wq->wq_next_batchid++; in finalize_phase_one()
330 fifo_len(wq->wq_donequeue)); in finalize_phase_one()
334 init_phase_two(workqueue_t *wq) in init_phase_two() argument
346 wq->wq_ninqueue = num = fifo_len(wq->wq_donequeue); in init_phase_two()
348 wq->wq_ninqueue += num / 2; in init_phase_two()
356 assert(fifo_len(wq->wq_queue) == 0); in init_phase_two()
357 fifo_free(wq->wq_queue, NULL); in init_phase_two()
358 wq->wq_queue = wq->wq_donequeue; in init_phase_two()
362 wip_save_work(workqueue_t *wq, wip_t *slot, int slotnum) in wip_save_work() argument
364 pthread_mutex_lock(&wq->wq_donequeue_lock); in wip_save_work()
366 while (wq->wq_lastdonebatch + 1 < slot->wip_batchid) in wip_save_work()
367 pthread_cond_wait(&slot->wip_cv, &wq->wq_donequeue_lock); in wip_save_work()
368 assert(wq->wq_lastdonebatch + 1 == slot->wip_batchid); in wip_save_work()
370 fifo_add(wq->wq_donequeue, slot->wip_td); in wip_save_work()
371 wq->wq_lastdonebatch++; in wip_save_work()
372 pthread_cond_signal(&wq->wq_wip[(slotnum + 1) % in wip_save_work()
373 wq->wq_nwipslots].wip_cv); in wip_save_work()
377 slot->wip_batchid = wq->wq_next_batchid++; in wip_save_work()
379 pthread_mutex_unlock(&wq->wq_donequeue_lock); in wip_save_work()
400 worker_runphase1(workqueue_t *wq) in worker_runphase1() argument
407 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase1()
409 while (fifo_empty(wq->wq_queue)) { in worker_runphase1()
410 if (wq->wq_nomorefiles == 1) { in worker_runphase1()
411 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase1()
412 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
418 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase1()
419 &wq->wq_queue_lock); in worker_runphase1()
423 pow = fifo_remove(wq->wq_queue); in worker_runphase1()
424 pownum = wq->wq_nextpownum++; in worker_runphase1()
425 pthread_cond_broadcast(&wq->wq_work_removed); in worker_runphase1()
430 wipslotnum = pownum % wq->wq_nwipslots; in worker_runphase1()
431 wipslot = &wq->wq_wip[wipslotnum]; in worker_runphase1()
435 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
439 if (wipslot->wip_nmerged == wq->wq_maxbatchsz) in worker_runphase1()
440 wip_save_work(wq, wipslot, wipslotnum); in worker_runphase1()
447 worker_runphase2(workqueue_t *wq) in worker_runphase2() argument
453 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
455 if (wq->wq_ninqueue == 1) { in worker_runphase2()
456 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase2()
457 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
461 if (barrier_wait(&wq->wq_bar1)) { in worker_runphase2()
462 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
463 wq->wq_alldone = 1; in worker_runphase2()
464 pthread_cond_signal(&wq->wq_alldone_cv); in worker_runphase2()
465 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
471 if (fifo_len(wq->wq_queue) < 2) { in worker_runphase2()
472 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase2()
473 &wq->wq_queue_lock); in worker_runphase2()
474 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
479 pow1 = fifo_remove(wq->wq_queue); in worker_runphase2()
480 pow2 = fifo_remove(wq->wq_queue); in worker_runphase2()
481 wq->wq_ninqueue -= 2; in worker_runphase2()
483 batchid = wq->wq_next_batchid++; in worker_runphase2()
485 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
496 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
497 while (wq->wq_lastdonebatch + 1 != batchid) { in worker_runphase2()
498 pthread_cond_wait(&wq->wq_done_cv, in worker_runphase2()
499 &wq->wq_queue_lock); in worker_runphase2()
502 wq->wq_lastdonebatch = batchid; in worker_runphase2()
504 fifo_add(wq->wq_queue, pow2); in worker_runphase2()
506 pthread_self(), (void *)pow2, fifo_len(wq->wq_queue), in worker_runphase2()
507 wq->wq_ninqueue); in worker_runphase2()
508 pthread_cond_broadcast(&wq->wq_done_cv); in worker_runphase2()
509 pthread_cond_signal(&wq->wq_work_avail); in worker_runphase2()
510 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
518 worker_thread(workqueue_t *wq) in worker_thread() argument
520 worker_runphase1(wq); in worker_thread()
524 if (barrier_wait(&wq->wq_bar1)) { in worker_thread()
528 finalize_phase_one(wq); in worker_thread()
530 init_phase_two(wq); in worker_thread()
533 wq->wq_ninqueue, fifo_len(wq->wq_queue)); in worker_thread()
538 (void) barrier_wait(&wq->wq_bar2); in worker_thread()
542 worker_runphase2(wq); in worker_thread()
552 workqueue_t *wq = arg; in merge_ctf_cb() local
556 pthread_mutex_lock(&wq->wq_queue_lock); in merge_ctf_cb()
557 while (fifo_len(wq->wq_queue) > wq->wq_ithrottle) { in merge_ctf_cb()
559 fifo_len(wq->wq_queue), wq->wq_ithrottle); in merge_ctf_cb()
560 pthread_cond_wait(&wq->wq_work_removed, &wq->wq_queue_lock); in merge_ctf_cb()
563 fifo_add(wq->wq_queue, td); in merge_ctf_cb()
565 pthread_cond_broadcast(&wq->wq_work_avail); in merge_ctf_cb()
566 pthread_mutex_unlock(&wq->wq_queue_lock); in merge_ctf_cb()
633 wq_init(workqueue_t *wq, int nfiles) in wq_init() argument
643 wq->wq_maxbatchsz = atoi(getenv("CTFMERGE_PHASE1_BATCH_SIZE")); in wq_init()
645 wq->wq_maxbatchsz = MERGE_PHASE1_BATCH_SIZE; in wq_init()
647 nslots = MIN(nslots, (nfiles + wq->wq_maxbatchsz - 1) / in wq_init()
648 wq->wq_maxbatchsz); in wq_init()
650 wq->wq_wip = xcalloc(sizeof (wip_t) * nslots); in wq_init()
651 wq->wq_nwipslots = nslots; in wq_init()
652 wq->wq_nthreads = MIN(sysconf(_SC_NPROCESSORS_ONLN) * 3 / 2, nslots); in wq_init()
653 wq->wq_thread = xmalloc(sizeof (pthread_t) * wq->wq_nthreads); in wq_init()
659 wq->wq_ithrottle = throttle * wq->wq_nthreads; in wq_init()
661 debug(1, "Using %d slots, %d threads\n", wq->wq_nwipslots, in wq_init()
662 wq->wq_nthreads); in wq_init()
664 wq->wq_next_batchid = 0; in wq_init()
667 pthread_mutex_init(&wq->wq_wip[i].wip_lock, NULL); in wq_init()
668 pthread_cond_init(&wq->wq_wip[i].wip_cv, NULL); in wq_init()
669 wq->wq_wip[i].wip_batchid = wq->wq_next_batchid++; in wq_init()
672 pthread_mutex_init(&wq->wq_queue_lock, NULL); in wq_init()
673 wq->wq_queue = fifo_new(); in wq_init()
674 pthread_cond_init(&wq->wq_work_avail, NULL); in wq_init()
675 pthread_cond_init(&wq->wq_work_removed, NULL); in wq_init()
676 wq->wq_ninqueue = nfiles; in wq_init()
677 wq->wq_nextpownum = 0; in wq_init()
679 pthread_mutex_init(&wq->wq_donequeue_lock, NULL); in wq_init()
680 wq->wq_donequeue = fifo_new(); in wq_init()
681 wq->wq_lastdonebatch = -1; in wq_init()
683 pthread_cond_init(&wq->wq_done_cv, NULL); in wq_init()
685 pthread_cond_init(&wq->wq_alldone_cv, NULL); in wq_init()
686 wq->wq_alldone = 0; in wq_init()
688 barrier_init(&wq->wq_bar1, wq->wq_nthreads); in wq_init()
689 barrier_init(&wq->wq_bar2, wq->wq_nthreads); in wq_init()
691 wq->wq_nomorefiles = 0; in wq_init()
695 start_threads(workqueue_t *wq) in start_threads() argument
706 for (i = 0; i < wq->wq_nthreads; i++) { in start_threads()
707 pthread_create(&wq->wq_thread[i], NULL, in start_threads()
708 (void *(*)(void *))worker_thread, wq); in start_threads()
724 join_threads(workqueue_t *wq) in join_threads() argument
728 for (i = 0; i < wq->wq_nthreads; i++) { in join_threads()
729 pthread_join(wq->wq_thread[i], NULL); in join_threads()
749 static workqueue_t wq; variable
903 wq_init(&wq, nielems); in main()
905 start_threads(&wq); in main()
914 &wq, require_ctf) == 0) { in main()
919 pthread_mutex_lock(&wq.wq_queue_lock); in main()
920 wq.wq_nomorefiles = 1; in main()
921 pthread_cond_broadcast(&wq.wq_work_avail); in main()
922 pthread_mutex_unlock(&wq.wq_queue_lock); in main()
924 pthread_mutex_lock(&wq.wq_queue_lock); in main()
925 while (wq.wq_alldone == 0) in main()
926 pthread_cond_wait(&wq.wq_alldone_cv, &wq.wq_queue_lock); in main()
927 pthread_mutex_unlock(&wq.wq_queue_lock); in main()
929 join_threads(&wq); in main()
942 assert(fifo_len(wq.wq_queue) == 1); in main()
943 mstrtd = fifo_remove(wq.wq_queue); in main()