Merge pull request #20 from kkroening/feature/17

Add support for multi-output filters; implement `split` filter
This commit is contained in:
Karl Kroening 2017-07-12 02:13:35 -06:00 committed by GitHub
commit 7669492575
10 changed files with 658 additions and 245 deletions

2
.gitignore vendored
View File

@ -2,6 +2,6 @@
.eggs .eggs
.tox/ .tox/
dist/ dist/
ffmpeg/tests/sample_data/dummy2.mp4 ffmpeg/tests/sample_data/out*.mp4
ffmpeg_python.egg-info/ ffmpeg_python.egg-info/
venv* venv*

View File

@ -1,6 +1,8 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from . import _filters, _ffmpeg, _run from . import _filters, _ffmpeg, _run
from ._filters import * from ._filters import *
from ._ffmpeg import * from ._ffmpeg import *
from ._run import * from ._run import *
__all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ __all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__

View File

@ -1,10 +1,12 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from .nodes import ( from .nodes import (
FilterNode, filter_operator,
GlobalNode, GlobalNode,
InputNode, InputNode,
operator, MergeOutputsNode,
OutputNode, OutputNode,
output_operator,
) )
@ -19,27 +21,27 @@ def input(filename, **kwargs):
if 'format' in kwargs: if 'format' in kwargs:
raise ValueError("Can't specify both `format` and `f` kwargs") raise ValueError("Can't specify both `format` and `f` kwargs")
kwargs['format'] = fmt kwargs['format'] = fmt
return InputNode(input.__name__, **kwargs) return InputNode(input.__name__, kwargs=kwargs).stream()
@operator(node_classes={OutputNode, GlobalNode}) @output_operator()
def overwrite_output(parent_node): def overwrite_output(stream):
"""Overwrite output files without asking (ffmpeg ``-y`` option) """Overwrite output files without asking (ffmpeg ``-y`` option)
Official documentation: `Main options <https://ffmpeg.org/ffmpeg.html#Main-options>`__ Official documentation: `Main options <https://ffmpeg.org/ffmpeg.html#Main-options>`__
""" """
return GlobalNode(parent_node, overwrite_output.__name__) return GlobalNode(stream, overwrite_output.__name__).stream()
@operator(node_classes={OutputNode}) @output_operator()
def merge_outputs(*parent_nodes): def merge_outputs(*streams):
"""Include all given outputs in one ffmpeg command line """Include all given outputs in one ffmpeg command line
""" """
return OutputNode(parent_nodes, merge_outputs.__name__) return MergeOutputsNode(streams, merge_outputs.__name__).stream()
@operator(node_classes={InputNode, FilterNode}) @filter_operator()
def output(parent_node, filename, **kwargs): def output(stream, filename, **kwargs):
"""Output file URL """Output file URL
Official documentation: `Synopsis <https://ffmpeg.org/ffmpeg.html#Synopsis>`__ Official documentation: `Synopsis <https://ffmpeg.org/ffmpeg.html#Synopsis>`__
@ -50,7 +52,7 @@ def output(parent_node, filename, **kwargs):
if 'format' in kwargs: if 'format' in kwargs:
raise ValueError("Can't specify both `format` and `f` kwargs") raise ValueError("Can't specify both `format` and `f` kwargs")
kwargs['format'] = fmt kwargs['format'] = fmt
return OutputNode([parent_node], output.__name__, **kwargs) return OutputNode(stream, output.__name__, kwargs=kwargs).stream()

View File

@ -1,63 +1,59 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from .nodes import FilterNode, operator from .nodes import FilterNode, filter_operator
from ._utils import escape_chars from ._utils import escape_chars
@operator() @filter_operator()
def filter_(parent_node, filter_name, *args, **kwargs): def filter_multi_output(stream_spec, filter_name, *args, **kwargs):
"""Apply custom single-source filter. """Apply custom filter with one or more outputs.
This is the same as ``filter_`` except that the filter can produce more than one output.
To reference an output stream, use either the ``.stream`` operator or bracket shorthand:
Example:
```
split = ffmpeg.input('in.mp4').filter_multi_output('split')
split0 = split.stream(0)
split1 = split[1]
ffmpeg.concat(split0, split1).output('out.mp4').run()
```
"""
return FilterNode(stream_spec, filter_name, args=args, kwargs=kwargs, max_inputs=None)
@filter_operator()
def filter_(stream_spec, filter_name, *args, **kwargs):
"""Apply custom filter.
``filter_`` is normally used by higher-level filter functions such as ``hflip``, but if a filter implementation ``filter_`` is normally used by higher-level filter functions such as ``hflip``, but if a filter implementation
is missing from ``fmpeg-python``, you can call ``filter_`` directly to have ``fmpeg-python`` pass the filter name is missing from ``fmpeg-python``, you can call ``filter_`` directly to have ``fmpeg-python`` pass the filter name
and arguments to ffmpeg verbatim. and arguments to ffmpeg verbatim.
Args: Args:
parent_node: Source stream to apply filter to. stream_spec: a Stream, list of Streams, or label-to-Stream dictionary mapping
filter_name: ffmpeg filter name, e.g. `colorchannelmixer` filter_name: ffmpeg filter name, e.g. `colorchannelmixer`
*args: list of args to pass to ffmpeg verbatim *args: list of args to pass to ffmpeg verbatim
**kwargs: list of keyword-args to pass to ffmpeg verbatim **kwargs: list of keyword-args to pass to ffmpeg verbatim
This function is used internally by all of the other single-source filters (e.g. ``hflip``, ``crop``, etc.).
For custom multi-source filters, see ``filter_multi`` instead.
The function name is suffixed with ``_`` in order avoid confusion with the standard python ``filter`` function. The function name is suffixed with ``_`` in order avoid confusion with the standard python ``filter`` function.
Example: Example:
``ffmpeg.input('in.mp4').filter_('hflip').output('out.mp4').run()`` ``ffmpeg.input('in.mp4').filter_('hflip').output('out.mp4').run()``
""" """
return FilterNode([parent_node], filter_name, *args, **kwargs) return filter_multi_output(stream_spec, filter_name, *args, **kwargs).stream()
def filter_multi(parent_nodes, filter_name, *args, **kwargs): @filter_operator()
"""Apply custom multi-source filter. def split(stream):
return FilterNode(stream, split.__name__)
This is nearly identical to the ``filter`` function except that it allows filters to be applied to multiple
streams. It's normally used by higher-level filter functions such as ``concat``, but if a filter implementation
is missing from ``fmpeg-python``, you can call ``filter_multi`` directly.
Note that because it applies to multiple streams, it can't be used as an operator, unlike the ``filter`` function
(e.g. ``ffmpeg.input('in.mp4').filter_('hflip')``)
Args:
parent_nodes: List of source streams to apply filter to.
filter_name: ffmpeg filter name, e.g. `concat`
*args: list of args to pass to ffmpeg verbatim
**kwargs: list of keyword-args to pass to ffmpeg verbatim
For custom single-source filters, see ``filter_multi`` instead.
Example:
``ffmpeg.filter_multi(ffmpeg.input('in1.mp4'), ffmpeg.input('in2.mp4'), 'concat', n=2).output('out.mp4').run()``
"""
return FilterNode(parent_nodes, filter_name, *args, **kwargs)
@filter_operator()
@operator() def setpts(stream, expr):
def setpts(parent_node, expr):
"""Change the PTS (presentation timestamp) of the input frames. """Change the PTS (presentation timestamp) of the input frames.
Args: Args:
@ -65,11 +61,11 @@ def setpts(parent_node, expr):
Official documentation: `setpts, asetpts <https://ffmpeg.org/ffmpeg-filters.html#setpts_002c-asetpts>`__ Official documentation: `setpts, asetpts <https://ffmpeg.org/ffmpeg-filters.html#setpts_002c-asetpts>`__
""" """
return filter_(parent_node, setpts.__name__, expr) return FilterNode(stream, setpts.__name__, args=[expr]).stream()
@operator() @filter_operator()
def trim(parent_node, **kwargs): def trim(stream, **kwargs):
"""Trim the input so that the output contains one continuous subpart of the input. """Trim the input so that the output contains one continuous subpart of the input.
Args: Args:
@ -87,10 +83,10 @@ def trim(parent_node, **kwargs):
Official documentation: `trim <https://ffmpeg.org/ffmpeg-filters.html#trim>`__ Official documentation: `trim <https://ffmpeg.org/ffmpeg-filters.html#trim>`__
""" """
return filter_(parent_node, trim.__name__, **kwargs) return FilterNode(stream, trim.__name__, kwargs=kwargs).stream()
@operator() @filter_operator()
def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs): def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs):
"""Overlay one video on top of another. """Overlay one video on top of another.
@ -135,29 +131,29 @@ def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs
Official documentation: `overlay <https://ffmpeg.org/ffmpeg-filters.html#overlay-1>`__ Official documentation: `overlay <https://ffmpeg.org/ffmpeg-filters.html#overlay-1>`__
""" """
kwargs['eof_action'] = eof_action kwargs['eof_action'] = eof_action
return filter_multi([main_parent_node, overlay_parent_node], overlay.__name__, **kwargs) return FilterNode([main_parent_node, overlay_parent_node], overlay.__name__, kwargs=kwargs, max_inputs=2).stream()
@operator() @filter_operator()
def hflip(parent_node): def hflip(stream):
"""Flip the input video horizontally. """Flip the input video horizontally.
Official documentation: `hflip <https://ffmpeg.org/ffmpeg-filters.html#hflip>`__ Official documentation: `hflip <https://ffmpeg.org/ffmpeg-filters.html#hflip>`__
""" """
return filter_(parent_node, hflip.__name__) return FilterNode(stream, hflip.__name__).stream()
@operator() @filter_operator()
def vflip(parent_node): def vflip(stream):
"""Flip the input video vertically. """Flip the input video vertically.
Official documentation: `vflip <https://ffmpeg.org/ffmpeg-filters.html#vflip>`__ Official documentation: `vflip <https://ffmpeg.org/ffmpeg-filters.html#vflip>`__
""" """
return filter_(parent_node, vflip.__name__) return FilterNode(stream, vflip.__name__).stream()
@operator() @filter_operator()
def drawbox(parent_node, x, y, width, height, color, thickness=None, **kwargs): def drawbox(stream, x, y, width, height, color, thickness=None, **kwargs):
"""Draw a colored box on the input image. """Draw a colored box on the input image.
Args: Args:
@ -178,11 +174,11 @@ def drawbox(parent_node, x, y, width, height, color, thickness=None, **kwargs):
""" """
if thickness: if thickness:
kwargs['t'] = thickness kwargs['t'] = thickness
return filter_(parent_node, drawbox.__name__, x, y, width, height, color, **kwargs) return FilterNode(stream, drawbox.__name__, args=[x, y, width, height, color], kwargs=kwargs).stream()
@operator() @filter_operator()
def drawtext(parent_node, text=None, x=0, y=0, escape_text=True, **kwargs): def drawtext(stream, text=None, x=0, y=0, escape_text=True, **kwargs):
"""Draw a text string or text from a specified file on top of a video, using the libfreetype library. """Draw a text string or text from a specified file on top of a video, using the libfreetype library.
To enable compilation of this filter, you need to configure FFmpeg with ``--enable-libfreetype``. To enable default To enable compilation of this filter, you need to configure FFmpeg with ``--enable-libfreetype``. To enable default
@ -320,11 +316,11 @@ def drawtext(parent_node, text=None, x=0, y=0, escape_text=True, **kwargs):
kwargs['x'] = x kwargs['x'] = x
if y != 0: if y != 0:
kwargs['y'] = y kwargs['y'] = y
return filter_(parent_node, drawtext.__name__, **kwargs) return filter_(stream, drawtext.__name__, **kwargs)
@operator() @filter_operator()
def concat(*parent_nodes, **kwargs): def concat(*streams, **kwargs):
"""Concatenate audio and video streams, joining them together one after the other. """Concatenate audio and video streams, joining them together one after the other.
The filter works on segments of synchronized video and audio streams. All segments must have the same number of The filter works on segments of synchronized video and audio streams. All segments must have the same number of
@ -349,12 +345,12 @@ def concat(*parent_nodes, **kwargs):
Official documentation: `concat <https://ffmpeg.org/ffmpeg-filters.html#concat>`__ Official documentation: `concat <https://ffmpeg.org/ffmpeg-filters.html#concat>`__
""" """
kwargs['n'] = len(parent_nodes) kwargs['n'] = len(streams)
return filter_multi(parent_nodes, concat.__name__, **kwargs) return FilterNode(streams, concat.__name__, kwargs=kwargs, max_inputs=None).stream()
@operator() @filter_operator()
def zoompan(parent_node, **kwargs): def zoompan(stream, **kwargs):
"""Apply Zoom & Pan effect. """Apply Zoom & Pan effect.
Args: Args:
@ -369,11 +365,11 @@ def zoompan(parent_node, **kwargs):
Official documentation: `zoompan <https://ffmpeg.org/ffmpeg-filters.html#zoompan>`__ Official documentation: `zoompan <https://ffmpeg.org/ffmpeg-filters.html#zoompan>`__
""" """
return filter_(parent_node, zoompan.__name__, **kwargs) return FilterNode(stream, zoompan.__name__, kwargs=kwargs).stream()
@operator() @filter_operator()
def hue(parent_node, **kwargs): def hue(stream, **kwargs):
"""Modify the hue and/or the saturation of the input. """Modify the hue and/or the saturation of the input.
Args: Args:
@ -384,16 +380,16 @@ def hue(parent_node, **kwargs):
Official documentation: `hue <https://ffmpeg.org/ffmpeg-filters.html#hue>`__ Official documentation: `hue <https://ffmpeg.org/ffmpeg-filters.html#hue>`__
""" """
return filter_(parent_node, hue.__name__, **kwargs) return FilterNode(stream, hue.__name__, kwargs=kwargs).stream()
@operator() @filter_operator()
def colorchannelmixer(parent_node, *args, **kwargs): def colorchannelmixer(stream, *args, **kwargs):
"""Adjust video input frames by re-mixing color channels. """Adjust video input frames by re-mixing color channels.
Official documentation: `colorchannelmixer <https://ffmpeg.org/ffmpeg-filters.html#colorchannelmixer>`__ Official documentation: `colorchannelmixer <https://ffmpeg.org/ffmpeg-filters.html#colorchannelmixer>`__
""" """
return filter_(parent_node, colorchannelmixer.__name__, **kwargs) return FilterNode(stream, colorchannelmixer.__name__, kwargs=kwargs).stream()
__all__ = [ __all__ = [
@ -401,7 +397,6 @@ __all__ = [
'concat', 'concat',
'drawbox', 'drawbox',
'filter_', 'filter_',
'filter_multi',
'hflip', 'hflip',
'hue', 'hue',
'overlay', 'overlay',

View File

@ -1,24 +1,27 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from .dag import get_outgoing_edges, topo_sort
from functools import reduce from functools import reduce
from past.builtins import basestring from past.builtins import basestring
import copy import copy
import operator as _operator import operator
import subprocess as _subprocess import subprocess as _subprocess
from ._ffmpeg import ( from ._ffmpeg import (
input, input,
merge_outputs,
output, output,
overwrite_output, overwrite_output,
) )
from .nodes import ( from .nodes import (
get_stream_spec_nodes,
FilterNode,
GlobalNode, GlobalNode,
InputNode, InputNode,
operator,
OutputNode, OutputNode,
output_operator,
) )
def _get_stream_name(name): def _get_stream_name(name):
return '[{}]'.format(name) return '[{}]'.format(name)
@ -34,8 +37,8 @@ def _convert_kwargs_to_cmd_line_args(kwargs):
def _get_input_args(input_node): def _get_input_args(input_node):
if input_node._name == input.__name__: if input_node.name == input.__name__:
kwargs = copy.copy(input_node._kwargs) kwargs = copy.copy(input_node.kwargs)
filename = kwargs.pop('filename') filename = kwargs.pop('filename')
fmt = kwargs.pop('format', None) fmt = kwargs.pop('format', None)
video_size = kwargs.pop('video_size', None) video_size = kwargs.pop('video_size', None)
@ -51,97 +54,95 @@ def _get_input_args(input_node):
return args return args
def _topo_sort(start_node): def _get_filter_spec(node, outgoing_edge_map, stream_name_map):
marked_nodes = [] incoming_edges = node.incoming_edges
sorted_nodes = [] outgoing_edges = get_outgoing_edges(node, outgoing_edge_map)
child_map = {} inputs = [stream_name_map[edge.upstream_node, edge.upstream_label] for edge in incoming_edges]
def visit(node, child): outputs = [stream_name_map[edge.upstream_node, edge.upstream_label] for edge in outgoing_edges]
if node in marked_nodes: filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(outgoing_edges), ''.join(outputs))
raise RuntimeError('Graph is not a DAG')
if child is not None:
if node not in child_map:
child_map[node] = []
child_map[node].append(child)
if node not in sorted_nodes:
marked_nodes.append(node)
[visit(parent, node) for parent in node._parents]
marked_nodes.remove(node)
sorted_nodes.append(node)
unmarked_nodes = [start_node]
while unmarked_nodes:
visit(unmarked_nodes.pop(), None)
return sorted_nodes, child_map
def _get_filter_spec(i, node, stream_name_map):
stream_name = _get_stream_name('v{}'.format(i))
stream_name_map[node] = stream_name
inputs = [stream_name_map[parent] for parent in node._parents]
filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(), stream_name)
return filter_spec return filter_spec
def _get_filter_arg(filter_nodes, stream_name_map): def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map):
filter_specs = [_get_filter_spec(i, node, stream_name_map) for i, node in enumerate(filter_nodes)] stream_count = 0
for upstream_node in filter_nodes:
outgoing_edge_map = outgoing_edge_maps[upstream_node]
for upstream_label, downstreams in list(outgoing_edge_map.items()):
if len(downstreams) > 1:
# TODO: automatically insert `splits` ahead of time via graph transformation.
raise ValueError('Encountered {} with multiple outgoing edges with same upstream label {!r}; a '
'`split` filter is probably required'.format(upstream_node, upstream_label))
stream_name_map[upstream_node, upstream_label] = _get_stream_name('s{}'.format(stream_count))
stream_count += 1
def _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map):
_allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map)
filter_specs = [_get_filter_spec(node, outgoing_edge_maps[node], stream_name_map) for node in filter_nodes]
return ';'.join(filter_specs) return ';'.join(filter_specs)
def _get_global_args(node): def _get_global_args(node):
if node._name == overwrite_output.__name__: if node.name == overwrite_output.__name__:
return ['-y'] return ['-y']
else: else:
raise ValueError('Unsupported global node: {}'.format(node)) raise ValueError('Unsupported global node: {}'.format(node))
def _get_output_args(node, stream_name_map): def _get_output_args(node, stream_name_map):
if node.name != output.__name__:
raise ValueError('Unsupported output node: {}'.format(node))
args = [] args = []
if node._name != merge_outputs.__name__: assert len(node.incoming_edges) == 1
stream_name = stream_name_map[node._parents[0]] edge = node.incoming_edges[0]
stream_name = stream_name_map[edge.upstream_node, edge.upstream_label]
if stream_name != '[0]': if stream_name != '[0]':
args += ['-map', stream_name] args += ['-map', stream_name]
if node._name == output.__name__: kwargs = copy.copy(node.kwargs)
kwargs = copy.copy(node._kwargs)
filename = kwargs.pop('filename') filename = kwargs.pop('filename')
fmt = kwargs.pop('format', None) fmt = kwargs.pop('format', None)
if fmt: if fmt:
args += ['-f', fmt] args += ['-f', fmt]
args += _convert_kwargs_to_cmd_line_args(kwargs) args += _convert_kwargs_to_cmd_line_args(kwargs)
args += [filename] args += [filename]
else:
raise ValueError('Unsupported output node: {}'.format(node))
return args return args
@operator(node_classes={OutputNode, GlobalNode}) @output_operator()
def get_args(node): def get_args(stream_spec, overwrite_output=False):
"""Get command-line arguments for ffmpeg.""" """Get command-line arguments for ffmpeg."""
nodes = get_stream_spec_nodes(stream_spec)
args = [] args = []
# TODO: group nodes together, e.g. `-i somefile -r somerate`. # TODO: group nodes together, e.g. `-i somefile -r somerate`.
sorted_nodes, child_map = _topo_sort(node) sorted_nodes, outgoing_edge_maps = topo_sort(nodes)
del(node)
input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)] input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)]
output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode) and not output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode)]
isinstance(node, GlobalNode)]
global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)] global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)]
filter_nodes = [node for node in sorted_nodes if node not in (input_nodes + output_nodes + global_nodes)] filter_nodes = [node for node in sorted_nodes if isinstance(node, FilterNode)]
stream_name_map = {node: _get_stream_name(i) for i, node in enumerate(input_nodes)} stream_name_map = {(node, None): _get_stream_name(i) for i, node in enumerate(input_nodes)}
filter_arg = _get_filter_arg(filter_nodes, stream_name_map) filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map)
args += reduce(_operator.add, [_get_input_args(node) for node in input_nodes]) args += reduce(operator.add, [_get_input_args(node) for node in input_nodes])
if filter_arg: if filter_arg:
args += ['-filter_complex', filter_arg] args += ['-filter_complex', filter_arg]
args += reduce(_operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes]) args += reduce(operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes])
args += reduce(_operator.add, [_get_global_args(node) for node in global_nodes], []) args += reduce(operator.add, [_get_global_args(node) for node in global_nodes], [])
if overwrite_output:
args += ['-y']
return args return args
@operator(node_classes={OutputNode, GlobalNode}) @output_operator()
def run(node, cmd='ffmpeg'): def run(stream_spec, cmd='ffmpeg', **kwargs):
"""Run ffmpeg on node graph.""" """Run ffmpeg on node graph.
Args:
**kwargs: keyword-arguments passed to ``get_args()`` (e.g. ``overwrite_output=True``).
"""
if isinstance(cmd, basestring): if isinstance(cmd, basestring):
cmd = [cmd] cmd = [cmd]
elif type(cmd) != list: elif type(cmd) != list:
cmd = list(cmd) cmd = list(cmd)
args = cmd + node.get_args() args = cmd + get_args(stream_spec, **kwargs)
_subprocess.check_call(args) _subprocess.check_call(args)

View File

@ -1,4 +1,34 @@
from __future__ import unicode_literals
from builtins import str from builtins import str
from past.builtins import basestring
import hashlib
def _recursive_repr(item):
"""Hack around python `repr` to deterministically represent dictionaries.
This is able to represent more things than json.dumps, since it does not require things to be JSON serializable
(e.g. datetimes).
"""
if isinstance(item, basestring):
result = str(item)
elif isinstance(item, list):
result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item]))
elif isinstance(item, dict):
kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)]
result = '{' + ', '.join(kv_pairs) + '}'
else:
result = repr(item)
return result
def get_hash(item):
repr_ = _recursive_repr(item).encode('utf-8')
return hashlib.md5(repr_).hexdigest()
def get_hash_int(item):
return int(get_hash(item), base=16)
def escape_chars(text, chars): def escape_chars(text, chars):

177
ffmpeg/dag.py Normal file
View File

@ -0,0 +1,177 @@
from __future__ import unicode_literals
from ._utils import get_hash, get_hash_int
from builtins import object
from collections import namedtuple
class DagNode(object):
"""Node in a directed-acyclic graph (DAG).
Edges:
DagNodes are connected by edges. An edge connects two nodes with a label for each side:
- ``upstream_node``: upstream/parent node
- ``upstream_label``: label on the outgoing side of the upstream node
- ``downstream_node``: downstream/child node
- ``downstream_label``: label on the incoming side of the downstream node
For example, DagNode A may be connected to DagNode B with an edge labelled "foo" on A's side, and "bar" on B's
side:
_____ _____
| | | |
| A >[foo]---[bar]> B |
|_____| |_____|
Edge labels may be integers or strings, and nodes cannot have more than one incoming edge with the same label.
DagNodes may have any number of incoming edges and any number of outgoing edges. DagNodes keep track only of
their incoming edges, but the entire graph structure can be inferred by looking at the furthest downstream
nodes and working backwards.
Hashing:
DagNodes must be hashable, and two nodes are considered to be equivalent if they have the same hash value.
Nodes are immutable, and the hash should remain constant as a result. If a node with new contents is required,
create a new node and throw the old one away.
String representation:
In order for graph visualization tools to show useful information, nodes must be representable as strings. The
``repr`` operator should provide a more or less "full" representation of the node, and the ``short_repr``
property should be a shortened, concise representation.
Again, because nodes are immutable, the string representations should remain constant.
"""
def __hash__(self):
"""Return an integer hash of the node."""
raise NotImplementedError()
def __eq__(self, other):
"""Compare two nodes; implementations should return True if (and only if) hashes match."""
raise NotImplementedError()
def __repr__(self, other):
"""Return a full string representation of the node."""
raise NotImplementedError()
@property
def short_repr(self):
"""Return a partial/concise representation of the node."""
raise NotImplementedError()
@property
def incoming_edge_map(self):
"""Provides information about all incoming edges that connect to this node.
The edge map is a dictionary that maps an ``incoming_label`` to ``(outgoing_node, outgoing_label)``. Note that
implicity, ``incoming_node`` is ``self``. See "Edges" section above.
"""
raise NotImplementedError()
DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstream_node', 'upstream_label'])
def get_incoming_edges(downstream_node, incoming_edge_map):
edges = []
for downstream_label, (upstream_node, upstream_label) in list(incoming_edge_map.items()):
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)]
return edges
def get_outgoing_edges(upstream_node, outgoing_edge_map):
edges = []
for upstream_label, downstream_infos in list(outgoing_edge_map.items()):
for (downstream_node, downstream_label) in downstream_infos:
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)]
return edges
class KwargReprNode(DagNode):
"""A DagNode that can be represented as a set of args+kwargs.
"""
@property
def __upstream_hashes(self):
hashes = []
for downstream_label, (upstream_node, upstream_label) in list(self.incoming_edge_map.items()):
hashes += [hash(x) for x in [downstream_label, upstream_node, upstream_label]]
return hashes
@property
def __inner_hash(self):
props = {'args': self.args, 'kwargs': self.kwargs}
return get_hash(props)
def __get_hash(self):
hashes = self.__upstream_hashes + [self.__inner_hash]
return get_hash_int(hashes)
def __init__(self, incoming_edge_map, name, args, kwargs):
self.__incoming_edge_map = incoming_edge_map
self.name = name
self.args = args
self.kwargs = kwargs
self.__hash = self.__get_hash()
def __hash__(self):
return self.__hash
def __eq__(self, other):
return hash(self) == hash(other)
@property
def short_hash(self):
return '{:x}'.format(abs(hash(self)))[:12]
def long_repr(self, include_hash=True):
formatted_props = ['{!r}'.format(arg) for arg in self.args]
formatted_props += ['{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)]
out = '{}({})'.format(self.name, ', '.join(formatted_props))
if include_hash:
out += ' <{}>'.format(self.short_hash)
return out
def __repr__(self):
return self.long_repr()
@property
def incoming_edges(self):
return get_incoming_edges(self, self.incoming_edge_map)
@property
def incoming_edge_map(self):
return self.__incoming_edge_map
@property
def short_repr(self):
return self.name
def topo_sort(downstream_nodes):
marked_nodes = []
sorted_nodes = []
outgoing_edge_maps = {}
def visit(upstream_node, upstream_label, downstream_node, downstream_label):
if upstream_node in marked_nodes:
raise RuntimeError('Graph is not a DAG')
if downstream_node is not None:
outgoing_edge_map = outgoing_edge_maps.get(upstream_node, {})
outgoing_edge_infos = outgoing_edge_map.get(upstream_label, [])
outgoing_edge_infos += [(downstream_node, downstream_label)]
outgoing_edge_map[upstream_label] = outgoing_edge_infos
outgoing_edge_maps[upstream_node] = outgoing_edge_map
if upstream_node not in sorted_nodes:
marked_nodes.append(upstream_node)
for edge in upstream_node.incoming_edges:
visit(edge.upstream_node, edge.upstream_label, edge.downstream_node, edge.downstream_label)
marked_nodes.remove(upstream_node)
sorted_nodes.append(upstream_node)
unmarked_nodes = [(node, None) for node in downstream_nodes]
while unmarked_nodes:
upstream_node, upstream_label = unmarked_nodes.pop()
visit(upstream_node, upstream_label, None, None)
return sorted_nodes, outgoing_edge_maps

View File

@ -1,88 +1,238 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from ._utils import escape_chars from .dag import KwargReprNode
from ._utils import escape_chars, get_hash_int
from builtins import object from builtins import object
import hashlib import os
import json
class Node(object): def _is_of_types(obj, types):
"""Node base""" valid = False
def __init__(self, parents, name, *args, **kwargs): for stream_type in types:
parent_hashes = [hash(parent) for parent in parents] if isinstance(obj, stream_type):
if len(parent_hashes) != len(set(parent_hashes)): valid = True
raise ValueError('Same node cannot be included as parent multiple times') break
self._parents = parents return valid
self._hash = None
self._name = name
self._args = args
self._kwargs = kwargs
def __repr__(self):
formatted_props = ['{}'.format(arg) for arg in self._args] def _get_types_str(types):
formatted_props += ['{}={!r}'.format(key, self._kwargs[key]) for key in sorted(self._kwargs)] return ', '.join(['{}.{}'.format(x.__module__, x.__name__) for x in types])
return '{}({})'.format(self._name, ','.join(formatted_props))
class Stream(object):
"""Represents the outgoing edge of an upstream node; may be used to create more downstream nodes."""
def __init__(self, upstream_node, upstream_label, node_types):
if not _is_of_types(upstream_node, node_types):
raise TypeError('Expected upstream node to be of one of the following type(s): {}; got {}'.format(
_get_types_str(node_types), type(upstream_node)))
self.node = upstream_node
self.label = upstream_label
def __hash__(self): def __hash__(self):
if self._hash is None: return get_hash_int([hash(self.node), hash(self.label)])
self._update_hash()
return self._hash
def __eq__(self, other): def __eq__(self, other):
return hash(self) == hash(other) return hash(self) == hash(other)
def _update_hash(self): def __repr__(self):
props = {'args': self._args, 'kwargs': self._kwargs} node_repr = self.node.long_repr(include_hash=False)
props_str = json.dumps(props, sort_keys=True).encode('utf-8') out = '{}[{!r}] <{}>'.format(node_repr, self.label, self.node.short_hash)
my_hash = hashlib.md5(props_str).hexdigest() return out
parent_hashes = [str(hash(parent)) for parent in self._parents]
hashes = parent_hashes + [my_hash]
hashes_str = ','.join(hashes).encode('utf-8') def get_stream_map(stream_spec):
hash_str = hashlib.md5(hashes_str).hexdigest() if stream_spec is None:
self._hash = int(hash_str, base=16) stream_map = {}
elif isinstance(stream_spec, Stream):
stream_map = {None: stream_spec}
elif isinstance(stream_spec, (list, tuple)):
stream_map = dict(enumerate(stream_spec))
elif isinstance(stream_spec, dict):
stream_map = stream_spec
return stream_map
def get_stream_map_nodes(stream_map):
nodes = []
for stream in list(stream_map.values()):
if not isinstance(stream, Stream):
raise TypeError('Expected Stream; got {}'.format(type(stream)))
nodes.append(stream.node)
return nodes
def get_stream_spec_nodes(stream_spec):
stream_map = get_stream_map(stream_spec)
return get_stream_map_nodes(stream_map)
class Node(KwargReprNode):
"""Node base"""
@classmethod
def __check_input_len(cls, stream_map, min_inputs, max_inputs):
if min_inputs is not None and len(stream_map) < min_inputs:
raise ValueError('Expected at least {} input stream(s); got {}'.format(min_inputs, len(stream_map)))
elif max_inputs is not None and len(stream_map) > max_inputs:
raise ValueError('Expected at most {} input stream(s); got {}'.format(max_inputs, len(stream_map)))
@classmethod
def __check_input_types(cls, stream_map, incoming_stream_types):
for stream in list(stream_map.values()):
if not _is_of_types(stream, incoming_stream_types):
raise TypeError('Expected incoming stream(s) to be of one of the following types: {}; got {}'
.format(_get_types_str(incoming_stream_types), type(stream)))
@classmethod
def __get_incoming_edge_map(cls, stream_map):
incoming_edge_map = {}
for downstream_label, upstream in list(stream_map.items()):
incoming_edge_map[downstream_label] = (upstream.node, upstream.label)
return incoming_edge_map
def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs, max_inputs, args=[],
kwargs={}):
stream_map = get_stream_map(stream_spec)
self.__check_input_len(stream_map, min_inputs, max_inputs)
self.__check_input_types(stream_map, incoming_stream_types)
incoming_edge_map = self.__get_incoming_edge_map(stream_map)
super(Node, self).__init__(incoming_edge_map, name, args, kwargs)
self.__outgoing_stream_type = outgoing_stream_type
def stream(self, label=None):
"""Create an outgoing stream originating from this node.
More nodes may be attached onto the outgoing stream.
"""
return self.__outgoing_stream_type(self, label)
def __getitem__(self, label):
"""Create an outgoing stream originating from this node; syntactic sugar for ``self.stream(label)``.
"""
return self.stream(label)
class FilterableStream(Stream):
def __init__(self, upstream_node, upstream_label):
super(FilterableStream, self).__init__(upstream_node, upstream_label, {InputNode, FilterNode})
class InputNode(Node): class InputNode(Node):
"""InputNode type""" """InputNode type"""
def __init__(self, name, *args, **kwargs): def __init__(self, name, args=[], kwargs={}):
super(InputNode, self).__init__(parents=[], name=name, *args, **kwargs) super(InputNode, self).__init__(
stream_spec=None,
name=name,
incoming_stream_types={},
outgoing_stream_type=FilterableStream,
min_inputs=0,
max_inputs=0,
args=args,
kwargs=kwargs
)
@property
def short_repr(self):
return os.path.basename(self.kwargs['filename'])
class FilterNode(Node): class FilterNode(Node):
def __init__(self, stream_spec, name, max_inputs=1, args=[], kwargs={}):
super(FilterNode, self).__init__(
stream_spec=stream_spec,
name=name,
incoming_stream_types={FilterableStream},
outgoing_stream_type=FilterableStream,
min_inputs=1,
max_inputs=max_inputs,
args=args,
kwargs=kwargs
)
"""FilterNode""" """FilterNode"""
def _get_filter(self): def _get_filter(self, outgoing_edges):
args = [escape_chars(x, '\\\'=:') for x in self._args] args = self.args
kwargs = {} kwargs = self.kwargs
for k, v in self._kwargs.items(): if self.name == 'split':
args = [len(outgoing_edges)]
out_args = [escape_chars(x, '\\\'=:') for x in args]
out_kwargs = {}
for k, v in list(kwargs.items()):
k = escape_chars(k, '\\\'=:') k = escape_chars(k, '\\\'=:')
v = escape_chars(v, '\\\'=:') v = escape_chars(v, '\\\'=:')
kwargs[k] = v out_kwargs[k] = v
arg_params = [escape_chars(v, '\\\'=:') for v in args] arg_params = [escape_chars(v, '\\\'=:') for v in out_args]
kwarg_params = ['{}={}'.format(k, kwargs[k]) for k in sorted(kwargs)] kwarg_params = ['{}={}'.format(k, out_kwargs[k]) for k in sorted(out_kwargs)]
params = arg_params + kwarg_params params = arg_params + kwarg_params
params_text = escape_chars(self._name, '\\\'=:') params_text = escape_chars(self.name, '\\\'=:')
if params: if params:
params_text += '={}'.format(':'.join(params)) params_text += '={}'.format(':'.join(params))
return escape_chars(params_text, '\\\'[],;') return escape_chars(params_text, '\\\'[],;')
class OutputNode(Node): class OutputNode(Node):
"""OutputNode""" def __init__(self, stream, name, args=[], kwargs={}):
pass super(OutputNode, self).__init__(
stream_spec=stream,
name=name,
incoming_stream_types={FilterableStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=1,
args=args,
kwargs=kwargs
)
@property
def short_repr(self):
return os.path.basename(self.kwargs['filename'])
class OutputStream(Stream):
def __init__(self, upstream_node, upstream_label):
super(OutputStream, self).__init__(upstream_node, upstream_label, {OutputNode, GlobalNode, MergeOutputsNode})
class MergeOutputsNode(Node):
def __init__(self, streams, name):
super(MergeOutputsNode, self).__init__(
stream_spec=streams,
name=name,
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=None
)
class GlobalNode(Node): class GlobalNode(Node):
def __init__(self, parent, name, *args, **kwargs): def __init__(self, stream, name, args=[], kwargs={}):
if not isinstance(parent, OutputNode): super(GlobalNode, self).__init__(
raise RuntimeError('Global nodes can only be attached after output nodes') stream_spec=stream,
super(GlobalNode, self).__init__([parent], name, *args, **kwargs) name=name,
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=1,
args=args,
kwargs=kwargs
)
def operator(node_classes={Node}, name=None): def stream_operator(stream_classes={Stream}, name=None):
def decorator(func): def decorator(func):
func_name = name or func.__name__ func_name = name or func.__name__
[setattr(node_class, func_name, func) for node_class in node_classes] [setattr(stream_class, func_name, func) for stream_class in stream_classes]
return func return func
return decorator return decorator
def filter_operator(name=None):
return stream_operator(stream_classes={FilterableStream}, name=name)
def output_operator(name=None):
return stream_operator(stream_classes={OutputStream}, name=name)

View File

@ -1,5 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from builtins import bytes
from builtins import range
import ffmpeg import ffmpeg
import os import os
import pytest import pytest
@ -10,9 +12,10 @@ import subprocess
TEST_DIR = os.path.dirname(__file__) TEST_DIR = os.path.dirname(__file__)
SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data') SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data')
TEST_INPUT_FILE = os.path.join(SAMPLE_DATA_DIR, 'dummy.mp4') TEST_INPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'in1.mp4')
TEST_OVERLAY_FILE = os.path.join(SAMPLE_DATA_DIR, 'overlay.png') TEST_OVERLAY_FILE = os.path.join(SAMPLE_DATA_DIR, 'overlay.png')
TEST_OUTPUT_FILE = os.path.join(SAMPLE_DATA_DIR, 'dummy2.mp4') TEST_OUTPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'out1.mp4')
TEST_OUTPUT_FILE2 = os.path.join(SAMPLE_DATA_DIR, 'out2.mp4')
subprocess.check_call(['ffmpeg', '-version']) subprocess.check_call(['ffmpeg', '-version'])
@ -48,11 +51,8 @@ def test_fluent_concat():
concat1 = ffmpeg.concat(trimmed1, trimmed2, trimmed3) concat1 = ffmpeg.concat(trimmed1, trimmed2, trimmed3)
concat2 = ffmpeg.concat(trimmed1, trimmed2, trimmed3) concat2 = ffmpeg.concat(trimmed1, trimmed2, trimmed3)
concat3 = ffmpeg.concat(trimmed1, trimmed3, trimmed2) concat3 = ffmpeg.concat(trimmed1, trimmed3, trimmed2)
concat4 = ffmpeg.concat()
concat5 = ffmpeg.concat()
assert concat1 == concat2 assert concat1 == concat2
assert concat1 != concat3 assert concat1 != concat3
assert concat4 == concat5
def test_fluent_output(): def test_fluent_output():
@ -75,19 +75,28 @@ def test_fluent_complex_filter():
) )
def test_repr(): def test_node_repr():
in_file = ffmpeg.input('dummy.mp4') in_file = ffmpeg.input('dummy.mp4')
trim1 = ffmpeg.trim(in_file, start_frame=10, end_frame=20) trim1 = ffmpeg.trim(in_file, start_frame=10, end_frame=20)
trim2 = ffmpeg.trim(in_file, start_frame=30, end_frame=40) trim2 = ffmpeg.trim(in_file, start_frame=30, end_frame=40)
trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60) trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60)
concatted = ffmpeg.concat(trim1, trim2, trim3) concatted = ffmpeg.concat(trim1, trim2, trim3)
output = ffmpeg.output(concatted, 'dummy2.mp4') output = ffmpeg.output(concatted, 'dummy2.mp4')
assert repr(in_file) == "input(filename={!r})".format('dummy.mp4') assert repr(in_file.node) == "input(filename={!r}) <{}>".format('dummy.mp4', in_file.node.short_hash)
assert repr(trim1) == "trim(end_frame=20,start_frame=10)" assert repr(trim1.node) == "trim(end_frame=20, start_frame=10) <{}>".format(trim1.node.short_hash)
assert repr(trim2) == "trim(end_frame=40,start_frame=30)" assert repr(trim2.node) == "trim(end_frame=40, start_frame=30) <{}>".format(trim2.node.short_hash)
assert repr(trim3) == "trim(end_frame=60,start_frame=50)" assert repr(trim3.node) == "trim(end_frame=60, start_frame=50) <{}>".format(trim3.node.short_hash)
assert repr(concatted) == "concat(n=3)" assert repr(concatted.node) == "concat(n=3) <{}>".format(concatted.node.short_hash)
assert repr(output) == "output(filename={!r})".format('dummy2.mp4') assert repr(output.node) == "output(filename={!r}) <{}>".format('dummy2.mp4', output.node.short_hash)
def test_stream_repr():
in_file = ffmpeg.input('dummy.mp4')
assert repr(in_file) == "input(filename={!r})[None] <{}>".format('dummy.mp4', in_file.node.short_hash)
split0 = in_file.filter_multi_output('split')[0]
assert repr(split0) == "split()[0] <{}>".format(split0.node.short_hash)
dummy_out = in_file.filter_multi_output('dummy')['out']
assert repr(dummy_out) == "dummy()[{!r}] <{}>".format(dummy_out.label, dummy_out.node.short_hash)
def test_get_args_simple(): def test_get_args_simple():
@ -96,16 +105,23 @@ def test_get_args_simple():
def _get_complex_filter_example(): def _get_complex_filter_example():
in_file = ffmpeg.input(TEST_INPUT_FILE) split = (ffmpeg
.input(TEST_INPUT_FILE1)
.vflip()
.split()
)
split0 = split[0]
split1 = split[1]
overlay_file = ffmpeg.input(TEST_OVERLAY_FILE) overlay_file = ffmpeg.input(TEST_OVERLAY_FILE)
return (ffmpeg return (ffmpeg
.concat( .concat(
in_file.trim(start_frame=10, end_frame=20), split0.trim(start_frame=10, end_frame=20),
in_file.trim(start_frame=30, end_frame=40), split1.trim(start_frame=30, end_frame=40),
) )
.overlay(overlay_file.hflip()) .overlay(overlay_file.hflip())
.drawbox(50, 50, 120, 120, color='red', thickness=5) .drawbox(50, 50, 120, 120, color='red', thickness=5)
.output(TEST_OUTPUT_FILE) .output(TEST_OUTPUT_FILE1)
.overwrite_output() .overwrite_output()
) )
@ -113,17 +129,18 @@ def _get_complex_filter_example():
def test_get_args_complex_filter(): def test_get_args_complex_filter():
out = _get_complex_filter_example() out = _get_complex_filter_example()
args = ffmpeg.get_args(out) args = ffmpeg.get_args(out)
assert args == [ assert args == ['-i', TEST_INPUT_FILE1,
'-i', TEST_INPUT_FILE,
'-i', TEST_OVERLAY_FILE, '-i', TEST_OVERLAY_FILE,
'-filter_complex', '-filter_complex',
'[0]trim=end_frame=20:start_frame=10[v0];' \ '[0]vflip[s0];' \
'[0]trim=end_frame=40:start_frame=30[v1];' \ '[s0]split=2[s1][s2];' \
'[v0][v1]concat=n=2[v2];' \ '[s1]trim=end_frame=20:start_frame=10[s3];' \
'[1]hflip[v3];' \ '[s2]trim=end_frame=40:start_frame=30[s4];' \
'[v2][v3]overlay=eof_action=repeat[v4];' \ '[s3][s4]concat=n=2[s5];' \
'[v4]drawbox=50:50:120:120:red:t=5[v5]', '[1]hflip[s6];' \
'-map', '[v5]', os.path.join(SAMPLE_DATA_DIR, 'dummy2.mp4'), '[s5][s6]overlay=eof_action=repeat[s7];' \
'[s7]drawbox=50:50:120:120:red:t=5[s8]',
'-map', '[s8]', TEST_OUTPUT_FILE1,
'-y' '-y'
] ]
@ -139,8 +156,8 @@ def test_filter_normal_arg_escape():
.get_args() .get_args()
) )
assert args[:3] == ['-i', 'in', '-filter_complex'] assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[v0]', 'out'] assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=font=a((.|\n)*)b:text=test\[v0\]', args[3], re.MULTILINE) match = re.match(r'\[0\]drawtext=font=a((.|\n)*)b:text=test\[s0\]', args[3], re.MULTILINE)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3]) assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1) return match.group(1)
@ -156,7 +173,7 @@ def test_filter_normal_arg_escape():
'=': 2, '=': 2,
'\n': 0, '\n': 0,
} }
for ch, expected_backslash_count in expected_backslash_counts.items(): for ch, expected_backslash_count in list(expected_backslash_counts.items()):
expected = '{}{}'.format('\\' * expected_backslash_count, ch) expected = '{}{}'.format('\\' * expected_backslash_count, ch)
actual = _get_drawtext_font_repr(ch) actual = _get_drawtext_font_repr(ch)
assert expected == actual assert expected == actual
@ -173,8 +190,8 @@ def test_filter_text_arg_str_escape():
.get_args() .get_args()
) )
assert args[:3] == ['-i', 'in', '-filter_complex'] assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[v0]', 'out'] assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=text=a((.|\n)*)b\[v0\]', args[3], re.MULTILINE) match = re.match(r'\[0\]drawtext=text=a((.|\n)*)b\[s0\]', args[3], re.MULTILINE)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3]) assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1) return match.group(1)
@ -190,7 +207,7 @@ def test_filter_text_arg_str_escape():
'=': 2, '=': 2,
'\n': 0, '\n': 0,
} }
for ch, expected_backslash_count in expected_backslash_counts.items(): for ch, expected_backslash_count in list(expected_backslash_counts.items()):
expected = '{}{}'.format('\\' * expected_backslash_count, ch) expected = '{}{}'.format('\\' * expected_backslash_count, ch)
actual = _get_drawtext_text_repr(ch) actual = _get_drawtext_text_repr(ch)
assert expected == actual assert expected == actual
@ -201,52 +218,91 @@ def test_filter_text_arg_str_escape():
def test_run(): def test_run():
node = _get_complex_filter_example() stream = _get_complex_filter_example()
ffmpeg.run(node) ffmpeg.run(stream)
def test_run_multi_output():
in_ = ffmpeg.input(TEST_INPUT_FILE1)
out1 = in_.output(TEST_OUTPUT_FILE1)
out2 = in_.output(TEST_OUTPUT_FILE2)
ffmpeg.run([out1, out2], overwrite_output=True)
def test_run_dummy_cmd(): def test_run_dummy_cmd():
node = _get_complex_filter_example() stream = _get_complex_filter_example()
ffmpeg.run(node, cmd='true') ffmpeg.run(stream, cmd='true')
def test_run_dummy_cmd_list(): def test_run_dummy_cmd_list():
node = _get_complex_filter_example() stream = _get_complex_filter_example()
ffmpeg.run(node, cmd=['true', 'ignored']) ffmpeg.run(stream, cmd=['true', 'ignored'])
def test_run_failing_cmd(): def test_run_failing_cmd():
node = _get_complex_filter_example() stream = _get_complex_filter_example()
with pytest.raises(subprocess.CalledProcessError): with pytest.raises(subprocess.CalledProcessError):
ffmpeg.run(node, cmd='false') ffmpeg.run(stream, cmd='false')
def test_custom_filter(): def test_custom_filter():
node = ffmpeg.input('dummy.mp4') stream = ffmpeg.input('dummy.mp4')
node = ffmpeg.filter_(node, 'custom_filter', 'a', 'b', kwarg1='c') stream = ffmpeg.filter_(stream, 'custom_filter', 'a', 'b', kwarg1='c')
node = ffmpeg.output(node, 'dummy2.mp4') stream = ffmpeg.output(stream, 'dummy2.mp4')
assert node.get_args() == [ assert stream.get_args() == [
'-i', 'dummy.mp4', '-i', 'dummy.mp4',
'-filter_complex', '[0]custom_filter=a:b:kwarg1=c[v0]', '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]',
'-map', '[v0]', '-map', '[s0]',
'dummy2.mp4' 'dummy2.mp4'
] ]
def test_custom_filter_fluent(): def test_custom_filter_fluent():
node = (ffmpeg stream = (ffmpeg
.input('dummy.mp4') .input('dummy.mp4')
.filter_('custom_filter', 'a', 'b', kwarg1='c') .filter_('custom_filter', 'a', 'b', kwarg1='c')
.output('dummy2.mp4') .output('dummy2.mp4')
) )
assert node.get_args() == [ assert stream.get_args() == [
'-i', 'dummy.mp4', '-i', 'dummy.mp4',
'-filter_complex', '[0]custom_filter=a:b:kwarg1=c[v0]', '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]',
'-map', '[v0]', '-map', '[s0]',
'dummy2.mp4' 'dummy2.mp4'
] ]
def test_merge_outputs():
in_ = ffmpeg.input('in.mp4')
out1 = in_.output('out1.mp4')
out2 = in_.output('out2.mp4')
assert ffmpeg.merge_outputs(out1, out2).get_args() == [
'-i', 'in.mp4', 'out1.mp4', 'out2.mp4'
]
assert ffmpeg.get_args([out1, out2]) == [
'-i', 'in.mp4', 'out2.mp4', 'out1.mp4'
]
def test_multi_passthrough():
out1 = ffmpeg.input('in1.mp4').output('out1.mp4')
out2 = ffmpeg.input('in2.mp4').output('out2.mp4')
out = ffmpeg.merge_outputs(out1, out2)
assert ffmpeg.get_args(out) == [
'-i', 'in1.mp4',
'-i', 'in2.mp4',
'out1.mp4',
'-map', '[1]', # FIXME: this should not be here (see #23)
'out2.mp4'
]
assert ffmpeg.get_args([out1, out2]) == [
'-i', 'in2.mp4',
'-i', 'in1.mp4',
'out2.mp4',
'-map', '[1]', # FIXME: this should not be here (see #23)
'out1.mp4'
]
def test_pipe(): def test_pipe():
width = 32 width = 32
height = 32 height = 32
@ -268,8 +324,8 @@ def test_pipe():
'-pixel_format', 'rgb24', '-pixel_format', 'rgb24',
'-i', 'pipe:0', '-i', 'pipe:0',
'-filter_complex', '-filter_complex',
'[0]trim=start_frame=2[v0]', '[0]trim=start_frame=2[s0]',
'-map', '[v0]', '-map', '[s0]',
'-f', 'rawvideo', '-f', 'rawvideo',
'pipe:1' 'pipe:1'
] ]