diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 4f0d446007..3c5cffa594 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -189,7 +189,6 @@ typedef struct PreMuxQueue { typedef struct SchMuxStream { SchedulerNode src; - SchedulerNode src_sched; unsigned *sub_heartbeat_dst; unsigned nb_sub_heartbeat_dst; @@ -235,7 +234,6 @@ typedef struct SchMux { typedef struct SchFilterIn { SchedulerNode src; - SchedulerNode src_sched; int send_finished; int receive_finished; } SchFilterIn; @@ -1268,24 +1266,31 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) { while (1) { SchFilterGraph *fg; - - // fed directly by a demuxer (i.e. not through a filtergraph) - if (src.type == SCH_NODE_TYPE_DEMUX) { + switch (src.type) { + case SCH_NODE_TYPE_DEMUX: + // fed directly by a demuxer (i.e. not through a filtergraph) sch->demux[src.idx].waiter.choked_next = 0; return; - } - - av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT); - fg = &sch->filters[src.idx]; - - // the filtergraph contains internal sources and - // requested to be scheduled directly - if (fg->best_input == fg->nb_inputs) { - fg->waiter.choked_next = 0; + case SCH_NODE_TYPE_DEC: + src = sch->dec[src.idx].src; + continue; + case SCH_NODE_TYPE_ENC: + src = sch->enc[src.idx].src; + continue; + case SCH_NODE_TYPE_FILTER_OUT: + fg = &sch->filters[src.idx]; + // the filtergraph contains internal sources and + // requested to be scheduled directly + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + return; + } + src = fg->inputs[fg->best_input].src; + continue; + default: + av_unreachable("Invalid source node type?"); return; } - - src = fg->inputs[fg->best_input].src_sched; } } @@ -1328,7 +1333,7 @@ static void schedule_update_locked(Scheduler *sch) continue; // resolve the source to unchoke - unchoke_for_stream(sch, ms->src_sched); + unchoke_for_stream(sch, ms->src); have_unchoked = 1; } } @@ -1361,6 +1366,27 @@ enum { CYCLE_NODE_DONE, }; +// Finds the filtergraph or muxer upstream of a scheduler node +static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src) +{ + while (1) { + switch (src.type) { + case SCH_NODE_TYPE_DEMUX: + case SCH_NODE_TYPE_FILTER_OUT: + return src; + case SCH_NODE_TYPE_DEC: + src = sch->dec[src.idx].src; + continue; + case SCH_NODE_TYPE_ENC: + src = sch->enc[src.idx].src; + continue; + default: + av_unreachable("Invalid source node type?"); + return (SchedulerNode) {0}; + } + } +} + static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack) @@ -1377,22 +1403,23 @@ check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, // descend into every input, depth first if (src.idx_stream < fg->nb_inputs) { const SchFilterIn *fi = &fg->inputs[src.idx_stream++]; + SchedulerNode node = src_filtergraph(sch, fi->src); // connected to demuxer, no cycles possible - if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX) + if (node.type == SCH_NODE_TYPE_DEMUX) continue; // otherwise connected to another filtergraph - av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); + av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT); // found a cycle - if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED) + if (filters_visited[node.idx] == CYCLE_NODE_STARTED) return AVERROR(EINVAL); // place current position on stack and descend av_assert0(nb_filters_stack < sch->nb_filters); filters_stack[nb_filters_stack++] = src; - src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 }; + src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 }; continue; } @@ -1514,22 +1541,7 @@ static int start_prepare(Scheduler *sch) for (unsigned j = 0; j < mux->nb_streams; j++) { SchMuxStream *ms = &mux->streams[j]; - switch (ms->src.type) { - case SCH_NODE_TYPE_ENC: { - SchEnc *enc = &sch->enc[ms->src.idx]; - if (enc->src.type == SCH_NODE_TYPE_DEC) { - ms->src_sched = sch->dec[enc->src.idx].src; - av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX); - } else { - ms->src_sched = enc->src; - av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); - } - break; - } - case SCH_NODE_TYPE_DEMUX: - ms->src_sched = ms->src; - break; - default: + if (!ms->src.type) { av_log(mux, AV_LOG_ERROR, "Muxer stream #%u not connected to a source\n", j); return AVERROR(EINVAL); @@ -1547,26 +1559,12 @@ static int start_prepare(Scheduler *sch) for (unsigned j = 0; j < fg->nb_inputs; j++) { SchFilterIn *fi = &fg->inputs[j]; - SchDec *dec; if (!fi->src.type) { av_log(fg, AV_LOG_ERROR, "Filtergraph input %u not connected to a source\n", j); return AVERROR(EINVAL); } - - if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT) - fi->src_sched = fi->src; - else { - av_assert0(fi->src.type == SCH_NODE_TYPE_DEC); - dec = &sch->dec[fi->src.idx]; - - switch (dec->src.type) { - case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break; - case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break; - default: av_assert0(0); - } - } } for (unsigned j = 0; j < fg->nb_outputs; j++) {