Lines Matching refs:pl

751 srlzer_enter(struct fmdump_pipeline *pl)  in srlzer_enter()  argument
753 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in srlzer_enter()
759 srlzer_exit(struct fmdump_pipeline *pl) in srlzer_exit() argument
761 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in srlzer_exit()
801 pipeline_stall(struct fmdump_pipeline *pl) in pipeline_stall() argument
803 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_stall()
806 (void) pthread_cond_wait(&pl->pl_cv, &srlzer->ds_lock); in pipeline_stall()
810 pipeline_continue(struct fmdump_pipeline *pl) in pipeline_continue() argument
812 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_continue()
815 (void) pthread_cond_signal(&srlzer->ds_pipearr[pl->pl_srlzeridx].pl_cv); in pipeline_continue()
827 pipeline_output(struct fmdump_pipeline *pl, const fmd_log_record_t *rp) in pipeline_output() argument
829 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_output()
832 int thisidx = pl->pl_srlzeridx; in pipeline_output()
865 if (wpl == pl) in pipeline_output()
875 pipeline_mark_consumed(struct fmdump_pipeline *pl) in pipeline_mark_consumed() argument
877 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_mark_consumed()
880 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_PROCESSING; in pipeline_mark_consumed()
885 pipeline_done(struct fmdump_pipeline *pl) in pipeline_done() argument
887 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_done()
890 srlzer_enter(pl); in pipeline_done()
892 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_DONE; in pipeline_done()
898 srlzer_exit(pl); in pipeline_done()
902 pipeline_pollmode(struct fmdump_pipeline *pl) in pipeline_pollmode() argument
904 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_pollmode()
907 if (srlzer->ds_slot[pl->pl_srlzeridx].ss_state == FMDUMP_PIPE_POLLING) in pipeline_pollmode()
910 srlzer_enter(pl); in pipeline_pollmode()
912 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_POLLING; in pipeline_pollmode()
917 srlzer_exit(pl); in pipeline_pollmode()
923 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg; in pipeline_err() local
925 fmdump_warn("skipping record in %s: %s\n", pl->pl_processing, in pipeline_err()
935 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg; in pipeline_cb() local
938 fmd_log_rec_f *func = pl->pl_arg.da_fmt->do_func; in pipeline_cb()
940 srlzer_enter(pl); in pipeline_cb()
942 if (!pipeline_output(pl, rp)) in pipeline_cb()
943 pipeline_stall(pl); in pipeline_cb()
945 rc = func(lp, rp, pl->pl_arg.da_fp); in pipeline_cb()
946 pipeline_mark_consumed(pl); in pipeline_cb()
948 srlzer_exit(pl); in pipeline_cb()
954 pipeline_process(struct fmdump_pipeline *pl, char *logpath, boolean_t follow) in pipeline_process() argument
961 pl->pl_processing = logpath; in pipeline_process()
973 pl->pl_ops = logtypes[i].lt_ops; in pipeline_process()
974 pl->pl_arg.da_fmt = in pipeline_process()
975 &pl->pl_ops->do_formats[pl->pl_fmt]; in pipeline_process()
980 if (pl->pl_ops == NULL) { in pipeline_process()
988 if (fmd_log_xiter(lp, FMD_LOG_XITER_REFS, pl->pl_arg.da_fc, in pipeline_process()
989 pl->pl_arg.da_fv, pipeline_cb, pipeline_err, (void *)pl, in pipeline_process()
999 pipeline_pollmode(pl); in pipeline_process()
1011 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg; in pipeline_thr() local
1014 (void) pthread_mutex_lock(&pl->pl_lock); in pipeline_thr()
1015 pl->pl_started = 1; in pipeline_thr()
1016 (void) pthread_mutex_unlock(&pl->pl_lock); in pipeline_thr()
1017 (void) pthread_cond_signal(&pl->pl_cv); in pipeline_thr()
1019 for (ll = pl->pl_rotated; ll != NULL; ll = ll->next) in pipeline_thr()
1020 pipeline_process(pl, ll->path, B_FALSE); in pipeline_thr()
1022 pipeline_process(pl, pl->pl_logpath, pl->pl_follow); in pipeline_thr()
1023 pipeline_done(pl); in pipeline_thr()
1034 struct fmdump_pipeline *pipeline, *pl; in aggregate() local
1094 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) { in aggregate()
1095 (void) pthread_mutex_init(&pl->pl_lock, NULL); in aggregate()
1096 (void) pthread_cond_init(&pl->pl_cv, NULL); in aggregate()
1098 pl->pl_srlzer = &srlzer; in aggregate()
1099 pl->pl_srlzeridx = i; in aggregate()
1100 pl->pl_follow = opt_f ? B_TRUE : B_FALSE; in aggregate()
1101 pl->pl_fmt = fmt; in aggregate()
1102 pl->pl_arg.da_fv = fv; in aggregate()
1103 pl->pl_arg.da_fc = fc; in aggregate()
1104 pl->pl_arg.da_fp = stdout; in aggregate()
1106 (void) pthread_mutex_lock(&pl->pl_lock); in aggregate()
1108 if (pthread_create(&pl->pl_thr, NULL, in aggregate()
1109 pipeline_thr, (void *)pl) != 0) in aggregate()
1114 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) { in aggregate()
1115 while (!pl->pl_started) in aggregate()
1116 (void) pthread_cond_wait(&pl->pl_cv, &pl->pl_lock); in aggregate()
1118 (void) pthread_mutex_unlock(&pl->pl_lock); in aggregate()
1121 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) in aggregate()
1122 (void) pthread_join(pl->pl_thr, NULL); in aggregate()