Lines Matching refs:pl

860 srlzer_enter(struct fmdump_pipeline *pl)  in srlzer_enter()  argument
862 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in srlzer_enter()
868 srlzer_exit(struct fmdump_pipeline *pl) in srlzer_exit() argument
870 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in srlzer_exit()
910 pipeline_stall(struct fmdump_pipeline *pl) in pipeline_stall() argument
912 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_stall()
915 (void) pthread_cond_wait(&pl->pl_cv, &srlzer->ds_lock); in pipeline_stall()
919 pipeline_continue(struct fmdump_pipeline *pl) in pipeline_continue() argument
921 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_continue()
924 (void) pthread_cond_signal(&srlzer->ds_pipearr[pl->pl_srlzeridx].pl_cv); in pipeline_continue()
936 pipeline_output(struct fmdump_pipeline *pl, const fmd_log_record_t *rp) in pipeline_output() argument
938 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_output()
941 int thisidx = pl->pl_srlzeridx; in pipeline_output()
974 if (wpl == pl) in pipeline_output()
984 pipeline_mark_consumed(struct fmdump_pipeline *pl) in pipeline_mark_consumed() argument
986 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_mark_consumed()
989 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_PROCESSING; in pipeline_mark_consumed()
994 pipeline_done(struct fmdump_pipeline *pl) in pipeline_done() argument
996 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_done()
999 srlzer_enter(pl); in pipeline_done()
1001 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_DONE; in pipeline_done()
1007 srlzer_exit(pl); in pipeline_done()
1011 pipeline_pollmode(struct fmdump_pipeline *pl) in pipeline_pollmode() argument
1013 struct fmdump_srlzer *srlzer = pl->pl_srlzer; in pipeline_pollmode()
1016 if (srlzer->ds_slot[pl->pl_srlzeridx].ss_state == FMDUMP_PIPE_POLLING) in pipeline_pollmode()
1019 srlzer_enter(pl); in pipeline_pollmode()
1021 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_POLLING; in pipeline_pollmode()
1026 srlzer_exit(pl); in pipeline_pollmode()
1032 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg; in pipeline_err() local
1034 fmdump_warn("skipping record in %s: %s\n", pl->pl_processing, in pipeline_err()
1044 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg; in pipeline_cb() local
1047 fmd_log_rec_f *func = pl->pl_arg.da_fmt->do_func; in pipeline_cb()
1049 srlzer_enter(pl); in pipeline_cb()
1051 if (!pipeline_output(pl, rp)) in pipeline_cb()
1052 pipeline_stall(pl); in pipeline_cb()
1054 rc = func(lp, rp, pl->pl_arg.da_fp); in pipeline_cb()
1055 pipeline_mark_consumed(pl); in pipeline_cb()
1057 srlzer_exit(pl); in pipeline_cb()
1063 pipeline_process(struct fmdump_pipeline *pl, char *logpath, boolean_t follow) in pipeline_process() argument
1070 pl->pl_processing = logpath; in pipeline_process()
1082 pl->pl_ops = logtypes[i].lt_ops; in pipeline_process()
1083 pl->pl_arg.da_fmt = in pipeline_process()
1084 &pl->pl_ops->do_formats[pl->pl_fmt]; in pipeline_process()
1089 if (pl->pl_ops == NULL) { in pipeline_process()
1097 if (fmd_log_xiter(lp, FMD_LOG_XITER_REFS, pl->pl_arg.da_fc, in pipeline_process()
1098 pl->pl_arg.da_fv, pipeline_cb, pipeline_err, (void *)pl, in pipeline_process()
1108 pipeline_pollmode(pl); in pipeline_process()
1120 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg; in pipeline_thr() local
1123 (void) pthread_mutex_lock(&pl->pl_lock); in pipeline_thr()
1124 pl->pl_started = 1; in pipeline_thr()
1125 (void) pthread_mutex_unlock(&pl->pl_lock); in pipeline_thr()
1126 (void) pthread_cond_signal(&pl->pl_cv); in pipeline_thr()
1128 for (ll = pl->pl_rotated; ll != NULL; ll = ll->next) in pipeline_thr()
1129 pipeline_process(pl, ll->path, B_FALSE); in pipeline_thr()
1131 pipeline_process(pl, pl->pl_logpath, pl->pl_follow); in pipeline_thr()
1132 pipeline_done(pl); in pipeline_thr()
1143 struct fmdump_pipeline *pipeline, *pl; in aggregate() local
1203 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) { in aggregate()
1204 (void) pthread_mutex_init(&pl->pl_lock, NULL); in aggregate()
1205 (void) pthread_cond_init(&pl->pl_cv, NULL); in aggregate()
1207 pl->pl_srlzer = &srlzer; in aggregate()
1208 pl->pl_srlzeridx = i; in aggregate()
1209 pl->pl_follow = opt_f ? B_TRUE : B_FALSE; in aggregate()
1210 pl->pl_fmt = fmt; in aggregate()
1211 pl->pl_arg.da_fv = fv; in aggregate()
1212 pl->pl_arg.da_fc = fc; in aggregate()
1213 pl->pl_arg.da_fp = stdout; in aggregate()
1215 (void) pthread_mutex_lock(&pl->pl_lock); in aggregate()
1217 if (pthread_create(&pl->pl_thr, NULL, in aggregate()
1218 pipeline_thr, (void *)pl) != 0) in aggregate()
1223 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) { in aggregate()
1224 while (!pl->pl_started) in aggregate()
1225 (void) pthread_cond_wait(&pl->pl_cv, &pl->pl_lock); in aggregate()
1227 (void) pthread_mutex_unlock(&pl->pl_lock); in aggregate()
1230 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) in aggregate()
1231 (void) pthread_join(pl->pl_thr, NULL); in aggregate()