diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index ed85bb16de..c5ee6971f1 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1282,14 +1282,59 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src); + +// Unchoke any filter graphs that are downstream of this node, to prevent it +// from getting stuck trying to push data to a full queue +static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) +{ + SchFilterGraph *fg; + SchDec *dec; + SchEnc *enc; + switch (dst->type) { + case SCH_NODE_TYPE_DEC: + dec = &sch->dec[dst->idx]; + for (int i = 0; i < dec->nb_outputs; i++) + unchoke_downstream(sch, dec->outputs[i].dst); + break; + case SCH_NODE_TYPE_ENC: + enc = &sch->enc[dst->idx]; + for (int i = 0; i < enc->nb_dst; i++) + unchoke_downstream(sch, &enc->dst[i]); + break; + case SCH_NODE_TYPE_MUX: + // muxers are never choked + break; + case SCH_NODE_TYPE_FILTER_IN: + fg = &sch->filters[dst->idx]; + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + } else { + // ensure that this filter graph is not stuck waiting for + // input from a different upstream demuxer + unchoke_for_stream(sch, fg->inputs[fg->best_input].src); + } + break; + default: + av_unreachable("Invalid destination node type?"); + break; + } +} + static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) { while (1) { SchFilterGraph *fg; + SchDemux *demux; switch (src.type) { case SCH_NODE_TYPE_DEMUX: // fed directly by a demuxer (i.e. not through a filtergraph) - sch->demux[src.idx].waiter.choked_next = 0; + demux = &sch->demux[src.idx]; + if (demux->waiter.choked_next == 0) + return; // prevent infinite loop + demux->waiter.choked_next = 0; + for (int i = 0; i < demux->nb_streams; i++) + unchoke_downstream(sch, demux->streams[i].dst); return; case SCH_NODE_TYPE_DEC: src = sch->dec[src.idx].src;