Lines Matching +full:time +full:- +full:slot

35  * be involved in a single merge at any given time, so the process decreases in
43 * takes the results of Phase I, and merges them two at a time. This disparity
44 * is due to an observation that the merge time increases at least quadratically
50 * That is, a merge should produce the same output every time, given the same
53 * are merged in the same order every time.
63 * thread, and are merged into wip array elements in round-robin order. When
64 * the number of files merged into a given array slot equals the batch size,
65 * the merged CTF graph in that array is added to the done slot in order by
66 * array slot.
101 * protected by separate mutexes - wq_queue_lock and wq_done_queue. wip
104 * when the thread restarts the loop. If the array slot was full, the
105 * array lock will be held while the slot contents are added to the done
106 * queue. The done queue lock is used to protect the wip slot cv's.
120 * array, worker threads remove two entries at a time from the beginning of
131 * finishes. We pre-determine the stopping point by pre-calculating the
151 * genunix are added to a third tdata_t - the uniquified tdata_t.
222 "Usage: %s [-fgstv] -l label | -L labelenv -o outfile file ...\n" in usage()
223 " %s [-fgstv] -l label | -L labelenv -o outfile -d uniqfile\n" in usage()
224 " %*s [-g] [-D uniqlabel] file ...\n" in usage()
225 " %s [-fgstv] -l label | -L labelenv -o outfile -w withfile " in usage()
227 " %s [-g] -c srcfile destfile\n" in usage()
229 " Note: if -L labelenv is specified and labelenv is not set in\n" in usage()
246 if ((sizes = getpagesizes(NULL, 0)) == -1) in bigheap()
252 if (getpagesizes(size, sizes) == -1) in bigheap()
255 while (size[sizes - 1] > maxpgsize) in bigheap()
256 sizes--; in bigheap()
259 big = size[sizes - 1]; in bigheap()
260 if (big & (big - 1)) { in bigheap()
271 if (brk((void *)((((uintptr_t)sbrk(0) - 1) & ~(big - 1)) + big)) != 0) in bigheap()
295 * order, starting with the slot containing the next batch that would in finalize_phase_one()
302 for (startslot = -1, i = 0; i < wq->wq_nwipslots; i++) { in finalize_phase_one()
303 if (wq->wq_wip[i].wip_batchid == wq->wq_lastdonebatch + 1) { in finalize_phase_one()
309 assert(startslot != -1); in finalize_phase_one()
311 for (i = startslot; i < startslot + wq->wq_nwipslots; i++) { in finalize_phase_one()
312 int slotnum = i % wq->wq_nwipslots; in finalize_phase_one()
313 wip_t *wipslot = &wq->wq_wip[slotnum]; in finalize_phase_one()
315 if (wipslot->wip_td != NULL) { in finalize_phase_one()
316 debug(2, "clearing slot %d (%d) (saving %d)\n", in finalize_phase_one()
317 slotnum, i, wipslot->wip_nmerged); in finalize_phase_one()
319 debug(2, "clearing slot %d (%d)\n", slotnum, i); in finalize_phase_one()
321 if (wipslot->wip_td != NULL) { in finalize_phase_one()
322 fifo_add(wq->wq_donequeue, wipslot->wip_td); in finalize_phase_one()
323 wq->wq_wip[slotnum].wip_td = NULL; in finalize_phase_one()
327 wq->wq_lastdonebatch = wq->wq_next_batchid++; in finalize_phase_one()
330 fifo_len(wq->wq_donequeue)); in finalize_phase_one()
346 wq->wq_ninqueue = num = fifo_len(wq->wq_donequeue); in init_phase_two()
348 wq->wq_ninqueue += num / 2; in init_phase_two()
356 assert(fifo_len(wq->wq_queue) == 0); in init_phase_two()
357 fifo_free(wq->wq_queue, NULL); in init_phase_two()
358 wq->wq_queue = wq->wq_donequeue; in init_phase_two()
362 wip_save_work(workqueue_t *wq, wip_t *slot, int slotnum) in wip_save_work() argument
364 pthread_mutex_lock(&wq->wq_donequeue_lock); in wip_save_work()
366 while (wq->wq_lastdonebatch + 1 < slot->wip_batchid) in wip_save_work()
367 pthread_cond_wait(&slot->wip_cv, &wq->wq_donequeue_lock); in wip_save_work()
368 assert(wq->wq_lastdonebatch + 1 == slot->wip_batchid); in wip_save_work()
370 fifo_add(wq->wq_donequeue, slot->wip_td); in wip_save_work()
371 wq->wq_lastdonebatch++; in wip_save_work()
372 pthread_cond_signal(&wq->wq_wip[(slotnum + 1) % in wip_save_work()
373 wq->wq_nwipslots].wip_cv); in wip_save_work()
375 /* reset the slot for next use */ in wip_save_work()
376 slot->wip_td = NULL; in wip_save_work()
377 slot->wip_batchid = wq->wq_next_batchid++; in wip_save_work()
379 pthread_mutex_unlock(&wq->wq_donequeue_lock); in wip_save_work()
383 wip_add_work(wip_t *slot, tdata_t *pow) in wip_add_work() argument
385 if (slot->wip_td == NULL) { in wip_add_work()
386 slot->wip_td = pow; in wip_add_work()
387 slot->wip_nmerged = 1; in wip_add_work()
390 (void *)pow, (void *)slot->wip_td); in wip_add_work()
392 merge_into_master(pow, slot->wip_td, NULL, 0); in wip_add_work()
395 slot->wip_nmerged++; in wip_add_work()
407 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase1()
409 while (fifo_empty(wq->wq_queue)) { in worker_runphase1()
410 if (wq->wq_nomorefiles == 1) { in worker_runphase1()
411 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase1()
412 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
418 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase1()
419 &wq->wq_queue_lock); in worker_runphase1()
423 pow = fifo_remove(wq->wq_queue); in worker_runphase1()
424 pownum = wq->wq_nextpownum++; in worker_runphase1()
425 pthread_cond_broadcast(&wq->wq_work_removed); in worker_runphase1()
429 /* merge it into the right slot */ in worker_runphase1()
430 wipslotnum = pownum % wq->wq_nwipslots; in worker_runphase1()
431 wipslot = &wq->wq_wip[wipslotnum]; in worker_runphase1()
433 pthread_mutex_lock(&wipslot->wip_lock); in worker_runphase1()
435 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
439 if (wipslot->wip_nmerged == wq->wq_maxbatchsz) in worker_runphase1()
442 pthread_mutex_unlock(&wipslot->wip_lock); in worker_runphase1()
453 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
455 if (wq->wq_ninqueue == 1) { in worker_runphase2()
456 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase2()
457 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
461 if (barrier_wait(&wq->wq_bar1)) { in worker_runphase2()
462 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
463 wq->wq_alldone = 1; in worker_runphase2()
464 pthread_cond_signal(&wq->wq_alldone_cv); in worker_runphase2()
465 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
471 if (fifo_len(wq->wq_queue) < 2) { in worker_runphase2()
472 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase2()
473 &wq->wq_queue_lock); in worker_runphase2()
474 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
479 pow1 = fifo_remove(wq->wq_queue); in worker_runphase2()
480 pow2 = fifo_remove(wq->wq_queue); in worker_runphase2()
481 wq->wq_ninqueue -= 2; in worker_runphase2()
483 batchid = wq->wq_next_batchid++; in worker_runphase2()
485 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
496 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
497 while (wq->wq_lastdonebatch + 1 != batchid) { in worker_runphase2()
498 pthread_cond_wait(&wq->wq_done_cv, in worker_runphase2()
499 &wq->wq_queue_lock); in worker_runphase2()
502 wq->wq_lastdonebatch = batchid; in worker_runphase2()
504 fifo_add(wq->wq_queue, pow2); in worker_runphase2()
506 pthread_self(), (void *)pow2, fifo_len(wq->wq_queue), in worker_runphase2()
507 wq->wq_ninqueue); in worker_runphase2()
508 pthread_cond_broadcast(&wq->wq_done_cv); in worker_runphase2()
509 pthread_cond_signal(&wq->wq_work_avail); in worker_runphase2()
510 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
524 if (barrier_wait(&wq->wq_bar1)) { in worker_thread()
533 wq->wq_ninqueue, fifo_len(wq->wq_queue)); in worker_thread()
538 (void) barrier_wait(&wq->wq_bar2); in worker_thread()
556 pthread_mutex_lock(&wq->wq_queue_lock); in merge_ctf_cb()
557 while (fifo_len(wq->wq_queue) > wq->wq_ithrottle) { in merge_ctf_cb()
559 fifo_len(wq->wq_queue), wq->wq_ithrottle); in merge_ctf_cb()
560 pthread_cond_wait(&wq->wq_work_removed, &wq->wq_queue_lock); in merge_ctf_cb()
563 fifo_add(wq->wq_queue, td); in merge_ctf_cb()
565 pthread_cond_broadcast(&wq->wq_work_avail); in merge_ctf_cb()
566 pthread_mutex_unlock(&wq->wq_queue_lock); in merge_ctf_cb()
573 * As such, in the event of a failure or user-initiated interrupt (^C), we need
574 * to ensure that a subsequent re-make will cause ctfmerge to be executed again.
585 * Another possibility would be to block SIGINT entirely - to always run to
586 * completion. The run time of ctfmerge can, however, be measured in minutes
592 terminate("Caught signal %d - exiting\n", sig); in handle_sig()
643 wq->wq_maxbatchsz = atoi(getenv("CTFMERGE_PHASE1_BATCH_SIZE")); in wq_init()
645 wq->wq_maxbatchsz = MERGE_PHASE1_BATCH_SIZE; in wq_init()
647 nslots = MIN(nslots, (nfiles + wq->wq_maxbatchsz - 1) / in wq_init()
648 wq->wq_maxbatchsz); in wq_init()
650 wq->wq_wip = xcalloc(sizeof (wip_t) * nslots); in wq_init()
651 wq->wq_nwipslots = nslots; in wq_init()
652 wq->wq_nthreads = MIN(sysconf(_SC_NPROCESSORS_ONLN) * 3 / 2, nslots); in wq_init()
653 wq->wq_thread = xmalloc(sizeof (pthread_t) * wq->wq_nthreads); in wq_init()
659 wq->wq_ithrottle = throttle * wq->wq_nthreads; in wq_init()
661 debug(1, "Using %d slots, %d threads\n", wq->wq_nwipslots, in wq_init()
662 wq->wq_nthreads); in wq_init()
664 wq->wq_next_batchid = 0; in wq_init()
667 pthread_mutex_init(&wq->wq_wip[i].wip_lock, NULL); in wq_init()
668 pthread_cond_init(&wq->wq_wip[i].wip_cv, NULL); in wq_init()
669 wq->wq_wip[i].wip_batchid = wq->wq_next_batchid++; in wq_init()
672 pthread_mutex_init(&wq->wq_queue_lock, NULL); in wq_init()
673 wq->wq_queue = fifo_new(); in wq_init()
674 pthread_cond_init(&wq->wq_work_avail, NULL); in wq_init()
675 pthread_cond_init(&wq->wq_work_removed, NULL); in wq_init()
676 wq->wq_ninqueue = nfiles; in wq_init()
677 wq->wq_nextpownum = 0; in wq_init()
679 pthread_mutex_init(&wq->wq_donequeue_lock, NULL); in wq_init()
680 wq->wq_donequeue = fifo_new(); in wq_init()
681 wq->wq_lastdonebatch = -1; in wq_init()
683 pthread_cond_init(&wq->wq_done_cv, NULL); in wq_init()
685 pthread_cond_init(&wq->wq_alldone_cv, NULL); in wq_init()
686 wq->wq_alldone = 0; in wq_init()
688 barrier_init(&wq->wq_bar1, wq->wq_nthreads); in wq_init()
689 barrier_init(&wq->wq_bar2, wq->wq_nthreads); in wq_init()
691 wq->wq_nomorefiles = 0; in wq_init()
706 for (i = 0; i < wq->wq_nthreads; i++) { in start_threads()
707 pthread_create(&wq->wq_thread[i], NULL, in start_threads()
728 for (i = 0; i < wq->wq_nthreads; i++) { in join_threads()
729 pthread_join(wq->wq_thread[i], NULL); in join_threads()
747 * hard-to-debug failure modes.
832 if (argc - optind != 2) in main()
844 if (argc - optind == 0) in main()
872 * so we shoe-horn a copier into ctfmerge. in main()
883 nifiles = argc - optind; in main()
948 iidesc_stats(mstrtd->td_iihash); in main()
969 if (CTF_V3_TYPE_ISCHILD(reftd->td_nextid)) in main()
972 savetd->td_nextid = withfile ? reftd->td_nextid : in main()
993 savetd->td_parlabel = xstrdup(parle->le_name); in main()
996 uniqname[MAXPATHLEN - 1] = '\0'; in main()
997 savetd->td_parname = xstrdup(basename(uniqname)); in main()
1002 * No post processing. Write the merged tree as-is into the in main()