Lines Matching +full:single +full:- +full:phase

30  * a single CTF section in an output file.
34 * data from a single input file, or the result of one or more merges) can only
35 * be involved in a single merge at any given time, so the process decreases in
37 * consolidated, finally resulting in a single merge of two large CTF graphs.
42 * phase entails the merging of CTF data in groups of eight. The second phase
43 * takes the results of Phase I, and merges them two at a time. This disparity
55 * Phase I
61 * Central to Phase I is the Work In Progress (wip) array, which is used to
63 * thread, and are merged into wip array elements in round-robin order. When
93 * this point, a pair of barriers are used to allow a single thread to move
95 * When this is complete, wq_done_queue is moved to wq_queue, and Phase II
98 * Locking Semantics (Phase I)
101 * protected by separate mutexes - wq_queue_lock and wq_done_queue. wip
110 * Phase I* by the done queue lock.
112 * Phase II
114 * When Phase II begins, the queue consists of the merged batches from the
115 * first phase. Assume we have five batches:
119 * Using the same batch ID mechanism we used in Phase I, but without the wip
130 * When one entry remains on the queue, with no merges outstanding, Phase II
131 * finishes. We pre-determine the stopping point by pre-calculating the
133 * number (wq_ninqueue) is 9. When ninqueue is 1, we conclude Phase II by
136 * Locking Semantics (Phase II)
140 * queue and corresponding lock are unused in Phase II as is the wip array.
151 * genunix are added to a third tdata_t - the uniquified tdata_t.
222 "Usage: %s [-fgstv] -l label | -L labelenv -o outfile file ...\n" in usage()
223 " %s [-fgstv] -l label | -L labelenv -o outfile -d uniqfile\n" in usage()
224 " %*s [-g] [-D uniqlabel] file ...\n" in usage()
225 " %s [-fgstv] -l label | -L labelenv -o outfile -w withfile " in usage()
227 " %s [-g] -c srcfile destfile\n" in usage()
229 " Note: if -L labelenv is specified and labelenv is not set in\n" in usage()
246 if ((sizes = getpagesizes(NULL, 0)) == -1) in bigheap()
252 if (getpagesizes(size, sizes) == -1) in bigheap()
255 while (size[sizes - 1] > maxpgsize) in bigheap()
256 sizes--; in bigheap()
259 big = size[sizes - 1]; in bigheap()
260 if (big & (big - 1)) { in bigheap()
271 if (brk((void *)((((uintptr_t)sbrk(0) - 1) & ~(big - 1)) + big)) != 0) in bigheap()
302 for (startslot = -1, i = 0; i < wq->wq_nwipslots; i++) { in finalize_phase_one()
303 if (wq->wq_wip[i].wip_batchid == wq->wq_lastdonebatch + 1) { in finalize_phase_one()
309 assert(startslot != -1); in finalize_phase_one()
311 for (i = startslot; i < startslot + wq->wq_nwipslots; i++) { in finalize_phase_one()
312 int slotnum = i % wq->wq_nwipslots; in finalize_phase_one()
313 wip_t *wipslot = &wq->wq_wip[slotnum]; in finalize_phase_one()
315 if (wipslot->wip_td != NULL) { in finalize_phase_one()
317 slotnum, i, wipslot->wip_nmerged); in finalize_phase_one()
321 if (wipslot->wip_td != NULL) { in finalize_phase_one()
322 fifo_add(wq->wq_donequeue, wipslot->wip_td); in finalize_phase_one()
323 wq->wq_wip[slotnum].wip_td = NULL; in finalize_phase_one()
327 wq->wq_lastdonebatch = wq->wq_next_batchid++; in finalize_phase_one()
329 debug(2, "phase one done: donequeue has %d items\n", in finalize_phase_one()
330 fifo_len(wq->wq_donequeue)); in finalize_phase_one()
344 * phase and as generated by merges during the phase. in init_phase_two()
346 wq->wq_ninqueue = num = fifo_len(wq->wq_donequeue); in init_phase_two()
348 wq->wq_ninqueue += num / 2; in init_phase_two()
354 * queue in phase 2. in init_phase_two()
356 assert(fifo_len(wq->wq_queue) == 0); in init_phase_two()
357 fifo_free(wq->wq_queue, NULL); in init_phase_two()
358 wq->wq_queue = wq->wq_donequeue; in init_phase_two()
364 pthread_mutex_lock(&wq->wq_donequeue_lock); in wip_save_work()
366 while (wq->wq_lastdonebatch + 1 < slot->wip_batchid) in wip_save_work()
367 pthread_cond_wait(&slot->wip_cv, &wq->wq_donequeue_lock); in wip_save_work()
368 assert(wq->wq_lastdonebatch + 1 == slot->wip_batchid); in wip_save_work()
370 fifo_add(wq->wq_donequeue, slot->wip_td); in wip_save_work()
371 wq->wq_lastdonebatch++; in wip_save_work()
372 pthread_cond_signal(&wq->wq_wip[(slotnum + 1) % in wip_save_work()
373 wq->wq_nwipslots].wip_cv); in wip_save_work()
376 slot->wip_td = NULL; in wip_save_work()
377 slot->wip_batchid = wq->wq_next_batchid++; in wip_save_work()
379 pthread_mutex_unlock(&wq->wq_donequeue_lock); in wip_save_work()
385 if (slot->wip_td == NULL) { in wip_add_work()
386 slot->wip_td = pow; in wip_add_work()
387 slot->wip_nmerged = 1; in wip_add_work()
390 (void *)pow, (void *)slot->wip_td); in wip_add_work()
392 merge_into_master(pow, slot->wip_td, NULL, 0); in wip_add_work()
395 slot->wip_nmerged++; in wip_add_work()
407 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase1()
409 while (fifo_empty(wq->wq_queue)) { in worker_runphase1()
410 if (wq->wq_nomorefiles == 1) { in worker_runphase1()
411 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase1()
412 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
414 /* on to phase 2 ... */ in worker_runphase1()
418 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase1()
419 &wq->wq_queue_lock); in worker_runphase1()
423 pow = fifo_remove(wq->wq_queue); in worker_runphase1()
424 pownum = wq->wq_nextpownum++; in worker_runphase1()
425 pthread_cond_broadcast(&wq->wq_work_removed); in worker_runphase1()
430 wipslotnum = pownum % wq->wq_nwipslots; in worker_runphase1()
431 wipslot = &wq->wq_wip[wipslotnum]; in worker_runphase1()
433 pthread_mutex_lock(&wipslot->wip_lock); in worker_runphase1()
435 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase1()
439 if (wipslot->wip_nmerged == wq->wq_maxbatchsz) in worker_runphase1()
442 pthread_mutex_unlock(&wipslot->wip_lock); in worker_runphase1()
453 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
455 if (wq->wq_ninqueue == 1) { in worker_runphase2()
456 pthread_cond_broadcast(&wq->wq_work_avail); in worker_runphase2()
457 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
461 if (barrier_wait(&wq->wq_bar1)) { in worker_runphase2()
462 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
463 wq->wq_alldone = 1; in worker_runphase2()
464 pthread_cond_signal(&wq->wq_alldone_cv); in worker_runphase2()
465 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
471 if (fifo_len(wq->wq_queue) < 2) { in worker_runphase2()
472 pthread_cond_wait(&wq->wq_work_avail, in worker_runphase2()
473 &wq->wq_queue_lock); in worker_runphase2()
474 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
479 pow1 = fifo_remove(wq->wq_queue); in worker_runphase2()
480 pow2 = fifo_remove(wq->wq_queue); in worker_runphase2()
481 wq->wq_ninqueue -= 2; in worker_runphase2()
483 batchid = wq->wq_next_batchid++; in worker_runphase2()
485 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
496 pthread_mutex_lock(&wq->wq_queue_lock); in worker_runphase2()
497 while (wq->wq_lastdonebatch + 1 != batchid) { in worker_runphase2()
498 pthread_cond_wait(&wq->wq_done_cv, in worker_runphase2()
499 &wq->wq_queue_lock); in worker_runphase2()
502 wq->wq_lastdonebatch = batchid; in worker_runphase2()
504 fifo_add(wq->wq_queue, pow2); in worker_runphase2()
506 pthread_self(), (void *)pow2, fifo_len(wq->wq_queue), in worker_runphase2()
507 wq->wq_ninqueue); in worker_runphase2()
508 pthread_cond_broadcast(&wq->wq_done_cv); in worker_runphase2()
509 pthread_cond_signal(&wq->wq_work_avail); in worker_runphase2()
510 pthread_mutex_unlock(&wq->wq_queue_lock); in worker_runphase2()
524 if (barrier_wait(&wq->wq_bar1)) { in worker_thread()
533 wq->wq_ninqueue, fifo_len(wq->wq_queue)); in worker_thread()
538 (void) barrier_wait(&wq->wq_bar2); in worker_thread()
540 debug(2, "%d: phase 1 complete\n", pthread_self()); in worker_thread()
556 pthread_mutex_lock(&wq->wq_queue_lock); in merge_ctf_cb()
557 while (fifo_len(wq->wq_queue) > wq->wq_ithrottle) { in merge_ctf_cb()
559 fifo_len(wq->wq_queue), wq->wq_ithrottle); in merge_ctf_cb()
560 pthread_cond_wait(&wq->wq_work_removed, &wq->wq_queue_lock); in merge_ctf_cb()
563 fifo_add(wq->wq_queue, td); in merge_ctf_cb()
565 pthread_cond_broadcast(&wq->wq_work_avail); in merge_ctf_cb()
566 pthread_mutex_unlock(&wq->wq_queue_lock); in merge_ctf_cb()
573 * As such, in the event of a failure or user-initiated interrupt (^C), we need
574 * to ensure that a subsequent re-make will cause ctfmerge to be executed again.
585 * Another possibility would be to block SIGINT entirely - to always run to
592 terminate("Caught signal %d - exiting\n", sig); in handle_sig()
643 wq->wq_maxbatchsz = atoi(getenv("CTFMERGE_PHASE1_BATCH_SIZE")); in wq_init()
645 wq->wq_maxbatchsz = MERGE_PHASE1_BATCH_SIZE; in wq_init()
647 nslots = MIN(nslots, (nfiles + wq->wq_maxbatchsz - 1) / in wq_init()
648 wq->wq_maxbatchsz); in wq_init()
650 wq->wq_wip = xcalloc(sizeof (wip_t) * nslots); in wq_init()
651 wq->wq_nwipslots = nslots; in wq_init()
652 wq->wq_nthreads = MIN(sysconf(_SC_NPROCESSORS_ONLN) * 3 / 2, nslots); in wq_init()
653 wq->wq_thread = xmalloc(sizeof (pthread_t) * wq->wq_nthreads); in wq_init()
659 wq->wq_ithrottle = throttle * wq->wq_nthreads; in wq_init()
661 debug(1, "Using %d slots, %d threads\n", wq->wq_nwipslots, in wq_init()
662 wq->wq_nthreads); in wq_init()
664 wq->wq_next_batchid = 0; in wq_init()
667 pthread_mutex_init(&wq->wq_wip[i].wip_lock, NULL); in wq_init()
668 pthread_cond_init(&wq->wq_wip[i].wip_cv, NULL); in wq_init()
669 wq->wq_wip[i].wip_batchid = wq->wq_next_batchid++; in wq_init()
672 pthread_mutex_init(&wq->wq_queue_lock, NULL); in wq_init()
673 wq->wq_queue = fifo_new(); in wq_init()
674 pthread_cond_init(&wq->wq_work_avail, NULL); in wq_init()
675 pthread_cond_init(&wq->wq_work_removed, NULL); in wq_init()
676 wq->wq_ninqueue = nfiles; in wq_init()
677 wq->wq_nextpownum = 0; in wq_init()
679 pthread_mutex_init(&wq->wq_donequeue_lock, NULL); in wq_init()
680 wq->wq_donequeue = fifo_new(); in wq_init()
681 wq->wq_lastdonebatch = -1; in wq_init()
683 pthread_cond_init(&wq->wq_done_cv, NULL); in wq_init()
685 pthread_cond_init(&wq->wq_alldone_cv, NULL); in wq_init()
686 wq->wq_alldone = 0; in wq_init()
688 barrier_init(&wq->wq_bar1, wq->wq_nthreads); in wq_init()
689 barrier_init(&wq->wq_bar2, wq->wq_nthreads); in wq_init()
691 wq->wq_nomorefiles = 0; in wq_init()
706 for (i = 0; i < wq->wq_nthreads; i++) { in start_threads()
707 pthread_create(&wq->wq_thread[i], NULL, in start_threads()
728 for (i = 0; i < wq->wq_nthreads; i++) { in join_threads()
729 pthread_join(wq->wq_thread[i], NULL); in join_threads()
747 * hard-to-debug failure modes.
832 if (argc - optind != 2) in main()
844 if (argc - optind == 0) in main()
872 * so we shoe-horn a copier into ctfmerge. in main()
883 nifiles = argc - optind; in main()
948 iidesc_stats(mstrtd->td_iihash); in main()
969 if (CTF_V3_TYPE_ISCHILD(reftd->td_nextid)) in main()
972 savetd->td_nextid = withfile ? reftd->td_nextid : in main()
993 savetd->td_parlabel = xstrdup(parle->le_name); in main()
996 uniqname[MAXPATHLEN - 1] = '\0'; in main()
997 savetd->td_parname = xstrdup(basename(uniqname)); in main()
1002 * No post processing. Write the merged tree as-is into the in main()