Use Black formatter

This commit is contained in:
Karl Kroening 2019-06-03 04:03:37 -05:00
parent 995cf67d7d
commit 8ea0f4ca4b
11 changed files with 441 additions and 255 deletions

View File

@ -94,9 +94,4 @@ def output(*streams_and_filename, **kwargs):
return OutputNode(streams, output.__name__, kwargs=kwargs).stream()
__all__ = [
'input',
'merge_outputs',
'output',
'overwrite_output',
]
__all__ = ['input', 'merge_outputs', 'output', 'overwrite_output']

View File

@ -21,7 +21,9 @@ def filter_multi_output(stream_spec, filter_name, *args, **kwargs):
ffmpeg.concat(split0, split1).output('out.mp4').run()
```
"""
return FilterNode(stream_spec, filter_name, args=args, kwargs=kwargs, max_inputs=None)
return FilterNode(
stream_spec, filter_name, args=args, kwargs=kwargs, max_inputs=None
)
@filter_operator()
@ -144,7 +146,12 @@ def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs
Official documentation: `overlay <https://ffmpeg.org/ffmpeg-filters.html#overlay-1>`__
"""
kwargs['eof_action'] = eof_action
return FilterNode([main_parent_node, overlay_parent_node], overlay.__name__, kwargs=kwargs, max_inputs=2).stream()
return FilterNode(
[main_parent_node, overlay_parent_node],
overlay.__name__,
kwargs=kwargs,
max_inputs=2,
).stream()
@filter_operator()
@ -180,10 +187,7 @@ def crop(stream, x, y, width, height, **kwargs):
Official documentation: `crop <https://ffmpeg.org/ffmpeg-filters.html#crop>`__
"""
return FilterNode(
stream,
crop.__name__,
args=[width, height, x, y],
kwargs=kwargs
stream, crop.__name__, args=[width, height, x, y], kwargs=kwargs
).stream()
@ -209,7 +213,9 @@ def drawbox(stream, x, y, width, height, color, thickness=None, **kwargs):
"""
if thickness:
kwargs['t'] = thickness
return FilterNode(stream, drawbox.__name__, args=[x, y, width, height, color], kwargs=kwargs).stream()
return FilterNode(
stream, drawbox.__name__, args=[x, y, width, height, color], kwargs=kwargs
).stream()
@filter_operator()
@ -385,8 +391,10 @@ def concat(*streams, **kwargs):
stream_count = video_stream_count + audio_stream_count
if len(streams) % stream_count != 0:
raise ValueError(
'Expected concat input streams to have length multiple of {} (v={}, a={}); got {}'
.format(stream_count, video_stream_count, audio_stream_count, len(streams)))
'Expected concat input streams to have length multiple of {} (v={}, a={}); got {}'.format(
stream_count, video_stream_count, audio_stream_count, len(streams)
)
)
kwargs['n'] = int(len(streams) / stream_count)
return FilterNode(streams, concat.__name__, kwargs=kwargs, max_inputs=None).stream()

View File

@ -24,6 +24,4 @@ def probe(filename, cmd='ffprobe', **kwargs):
return json.loads(out.decode('utf-8'))
__all__ = [
'probe',
]
__all__ = ['probe']

View File

@ -8,10 +8,7 @@ import copy
import operator
import subprocess
from ._ffmpeg import (
input,
output,
)
from ._ffmpeg import input, output
from .nodes import (
get_stream_spec_nodes,
FilterNode,
@ -24,7 +21,9 @@ from .nodes import (
class Error(Exception):
def __init__(self, cmd, stdout, stderr):
super(Error, self).__init__('{} error (see stderr output for detail)'.format(cmd))
super(Error, self).__init__(
'{} error (see stderr output for detail)'.format(cmd)
)
self.stdout = stdout
self.stderr = stderr
@ -69,9 +68,15 @@ def _format_output_stream_name(stream_name_map, edge):
def _get_filter_spec(node, outgoing_edge_map, stream_name_map):
incoming_edges = node.incoming_edges
outgoing_edges = get_outgoing_edges(node, outgoing_edge_map)
inputs = [_format_input_stream_name(stream_name_map, edge) for edge in incoming_edges]
outputs = [_format_output_stream_name(stream_name_map, edge) for edge in outgoing_edges]
filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(outgoing_edges), ''.join(outputs))
inputs = [
_format_input_stream_name(stream_name_map, edge) for edge in incoming_edges
]
outputs = [
_format_output_stream_name(stream_name_map, edge) for edge in outgoing_edges
]
filter_spec = '{}{}{}'.format(
''.join(inputs), node._get_filter(outgoing_edges), ''.join(outputs)
)
return filter_spec
@ -84,14 +89,20 @@ def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_
# TODO: automatically insert `splits` ahead of time via graph transformation.
raise ValueError(
'Encountered {} with multiple outgoing edges with same upstream label {!r}; a '
'`split` filter is probably required'.format(upstream_node, upstream_label))
'`split` filter is probably required'.format(
upstream_node, upstream_label
)
)
stream_name_map[upstream_node, upstream_label] = 's{}'.format(stream_count)
stream_count += 1
def _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map):
_allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map)
filter_specs = [_get_filter_spec(node, outgoing_edge_maps[node], stream_name_map) for node in filter_nodes]
filter_specs = [
_get_filter_spec(node, outgoing_edge_maps[node], stream_name_map)
for node in filter_nodes
]
return ';'.join(filter_specs)
@ -109,7 +120,9 @@ def _get_output_args(node, stream_name_map):
for edge in node.incoming_edges:
# edge = node.incoming_edges[0]
stream_name = _format_input_stream_name(stream_name_map, edge, is_final_arg=True)
stream_name = _format_input_stream_name(
stream_name_map, edge, is_final_arg=True
)
if stream_name != '0' or len(node.incoming_edges) > 1:
args += ['-map', stream_name]
@ -123,7 +136,9 @@ def _get_output_args(node, stream_name_map):
args += ['-b:a', str(kwargs.pop('audio_bitrate'))]
if 'video_size' in kwargs:
video_size = kwargs.pop('video_size')
if not isinstance(video_size, basestring) and isinstance(video_size, collections.Iterable):
if not isinstance(video_size, basestring) and isinstance(
video_size, collections.Iterable
):
video_size = '{}x{}'.format(video_size[0], video_size[1])
args += ['-video_size', video_size]
args += convert_kwargs_to_cmd_line_args(kwargs)
@ -147,7 +162,9 @@ def get_args(stream_spec, overwrite_output=False):
args += reduce(operator.add, [_get_input_args(node) for node in input_nodes])
if filter_arg:
args += ['-filter_complex', filter_arg]
args += reduce(operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes])
args += reduce(
operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes]
)
args += reduce(operator.add, [_get_global_args(node) for node in global_nodes], [])
if overwrite_output:
args += ['-y']
@ -175,8 +192,14 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False):
@output_operator()
def run_async(
stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False, pipe_stderr=False,
quiet=False, overwrite_output=False):
stream_spec,
cmd='ffmpeg',
pipe_stdin=False,
pipe_stdout=False,
pipe_stderr=False,
quiet=False,
overwrite_output=False,
):
"""Asynchronously invoke ffmpeg for the supplied node graph.
Args:
@ -259,13 +282,20 @@ def run_async(
stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None
stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None
return subprocess.Popen(
args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream
)
@output_operator()
def run(
stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None,
quiet=False, overwrite_output=False):
stream_spec,
cmd='ffmpeg',
capture_stdout=False,
capture_stderr=False,
input=None,
quiet=False,
overwrite_output=False,
):
"""Invoke ffmpeg for the supplied node graph.
Args:
@ -296,10 +326,4 @@ def run(
return out, err
__all__ = [
'compile',
'Error',
'get_args',
'run',
'run_async',
]
__all__ = ['compile', 'Error', 'get_args', 'run', 'run_async']

View File

@ -34,8 +34,11 @@ def with_metaclass(meta, *bases):
if sys.version_info.major >= 3:
class basestring(with_metaclass(BaseBaseString)):
pass
else:
# noinspection PyUnresolvedReferences,PyCompatibility
from builtins import basestring
@ -52,7 +55,10 @@ def _recursive_repr(item):
elif isinstance(item, list):
result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item]))
elif isinstance(item, dict):
kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)]
kv_pairs = [
'{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k]))
for k in sorted(item)
]
result = '{' + ', '.join(kv_pairs) + '}'
else:
result = repr(item)

View File

@ -34,8 +34,10 @@ def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
try:
import graphviz
except ImportError:
raise ImportError('failed to import graphviz; please make sure graphviz is installed (e.g. `pip install '
'graphviz`)')
raise ImportError(
'failed to import graphviz; please make sure graphviz is installed (e.g. `pip install '
'graphviz`)'
)
show_labels = kwargs.pop('show_labels', True)
if pipe and filename is not None:
@ -49,7 +51,9 @@ def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
graph = graphviz.Digraph(format='png')
graph.attr(rankdir='LR')
if len(list(kwargs.keys())) != 0:
raise ValueError('Invalid kwargs key(s): {}'.format(', '.join(list(kwargs.keys()))))
raise ValueError(
'Invalid kwargs key(s): {}'.format(', '.join(list(kwargs.keys())))
)
for node in sorted_nodes:
color = _get_node_color(node)
@ -57,11 +61,15 @@ def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
if detail:
lines = [node.short_repr]
lines += ['{!r}'.format(arg) for arg in node.args]
lines += ['{}={!r}'.format(key, node.kwargs[key]) for key in sorted(node.kwargs)]
lines += [
'{}={!r}'.format(key, node.kwargs[key]) for key in sorted(node.kwargs)
]
node_text = '\n'.join(lines)
else:
node_text = node.short_repr
graph.node(str(hash(node)), node_text, shape='box', style='filled', fillcolor=color)
graph.node(
str(hash(node)), node_text, shape='box', style='filled', fillcolor=color
)
outgoing_edge_map = outgoing_edge_maps.get(node, {})
for edge in get_outgoing_edges(node, outgoing_edge_map):
@ -70,7 +78,11 @@ def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
down_label = edge.downstream_label
up_selector = edge.upstream_selector
if show_labels and (up_label is not None or down_label is not None or up_selector is not None):
if show_labels and (
up_label is not None
or down_label is not None
or up_selector is not None
):
if up_label is None:
up_label = ''
if up_selector is not None:
@ -93,6 +105,4 @@ def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
return stream_spec
__all__ = [
'view',
]
__all__ = ['view']

View File

@ -70,14 +70,31 @@ class DagNode(object):
raise NotImplementedError()
DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstream_node', 'upstream_label', 'upstream_selector'])
DagEdge = namedtuple(
"DagEdge",
[
"downstream_node",
"downstream_label",
"upstream_node",
"upstream_label",
"upstream_selector",
],
)
def get_incoming_edges(downstream_node, incoming_edge_map):
edges = []
for downstream_label, upstream_info in list(incoming_edge_map.items()):
upstream_node, upstream_label, upstream_selector = upstream_info
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label, upstream_selector)]
edges += [
DagEdge(
downstream_node,
downstream_label,
upstream_node,
upstream_label,
upstream_selector,
)
]
return edges
@ -86,7 +103,15 @@ def get_outgoing_edges(upstream_node, outgoing_edge_map):
for upstream_label, downstream_infos in list(outgoing_edge_map.items()):
for downstream_info in downstream_infos:
downstream_node, downstream_label, downstream_selector = downstream_info
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label, downstream_selector)]
edges += [
DagEdge(
downstream_node,
downstream_label,
upstream_node,
upstream_label,
downstream_selector,
)
]
return edges
@ -99,12 +124,20 @@ class KwargReprNode(DagNode):
hashes = []
for downstream_label, upstream_info in list(self.incoming_edge_map.items()):
upstream_node, upstream_label, upstream_selector = upstream_info
hashes += [hash(x) for x in [downstream_label, upstream_node, upstream_label, upstream_selector]]
hashes += [
hash(x)
for x in [
downstream_label,
upstream_node,
upstream_label,
upstream_selector,
]
]
return hashes
@property
def __inner_hash(self):
props = {'args': self.args, 'kwargs': self.kwargs}
props = {"args": self.args, "kwargs": self.kwargs}
return get_hash(props)
def __get_hash(self):
@ -126,14 +159,16 @@ class KwargReprNode(DagNode):
@property
def short_hash(self):
return '{:x}'.format(abs(hash(self)))[:12]
return "{:x}".format(abs(hash(self)))[:12]
def long_repr(self, include_hash=True):
formatted_props = ['{!r}'.format(arg) for arg in self.args]
formatted_props += ['{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)]
out = '{}({})'.format(self.name, ', '.join(formatted_props))
formatted_props = ["{!r}".format(arg) for arg in self.args]
formatted_props += [
"{}={!r}".format(key, self.kwargs[key]) for key in sorted(self.kwargs)
]
out = "{}({})".format(self.name, ", ".join(formatted_props))
if include_hash:
out += ' <{}>'.format(self.short_hash)
out += " <{}>".format(self.short_hash)
return out
def __repr__(self):
@ -157,21 +192,35 @@ def topo_sort(downstream_nodes):
sorted_nodes = []
outgoing_edge_maps = {}
def visit(upstream_node, upstream_label, downstream_node, downstream_label, downstream_selector=None):
def visit(
upstream_node,
upstream_label,
downstream_node,
downstream_label,
downstream_selector=None,
):
if upstream_node in marked_nodes:
raise RuntimeError('Graph is not a DAG')
raise RuntimeError("Graph is not a DAG")
if downstream_node is not None:
outgoing_edge_map = outgoing_edge_maps.get(upstream_node, {})
outgoing_edge_infos = outgoing_edge_map.get(upstream_label, [])
outgoing_edge_infos += [(downstream_node, downstream_label, downstream_selector)]
outgoing_edge_infos += [
(downstream_node, downstream_label, downstream_selector)
]
outgoing_edge_map[upstream_label] = outgoing_edge_infos
outgoing_edge_maps[upstream_node] = outgoing_edge_map
if upstream_node not in sorted_nodes:
marked_nodes.append(upstream_node)
for edge in upstream_node.incoming_edges:
visit(edge.upstream_node, edge.upstream_label, edge.downstream_node, edge.downstream_label, edge.upstream_selector)
visit(
edge.upstream_node,
edge.upstream_label,
edge.downstream_node,
edge.downstream_label,
edge.upstream_selector,
)
marked_nodes.remove(upstream_node)
sorted_nodes.append(upstream_node)

View File

@ -23,10 +23,15 @@ def _get_types_str(types):
class Stream(object):
"""Represents the outgoing edge of an upstream node; may be used to create more downstream nodes."""
def __init__(self, upstream_node, upstream_label, node_types, upstream_selector=None):
def __init__(
self, upstream_node, upstream_label, node_types, upstream_selector=None
):
if not _is_of_types(upstream_node, node_types):
raise TypeError('Expected upstream node to be of one of the following type(s): {}; got {}'.format(
_get_types_str(node_types), type(upstream_node)))
raise TypeError(
'Expected upstream node to be of one of the following type(s): {}; got {}'.format(
_get_types_str(node_types), type(upstream_node)
)
)
self.node = upstream_node
self.label = upstream_label
self.selector = upstream_selector
@ -42,7 +47,9 @@ class Stream(object):
selector = ''
if self.selector:
selector = ':{}'.format(self.selector)
out = '{}[{!r}{}] <{}>'.format(node_repr, self.label, selector, self.node.short_hash)
out = '{}[{!r}{}] <{}>'.format(
node_repr, self.label, selector, self.node.short_hash
)
return out
def __getitem__(self, index):
@ -146,26 +153,50 @@ class Node(KwargReprNode):
@classmethod
def __check_input_len(cls, stream_map, min_inputs, max_inputs):
if min_inputs is not None and len(stream_map) < min_inputs:
raise ValueError('Expected at least {} input stream(s); got {}'.format(min_inputs, len(stream_map)))
raise ValueError(
'Expected at least {} input stream(s); got {}'.format(
min_inputs, len(stream_map)
)
)
elif max_inputs is not None and len(stream_map) > max_inputs:
raise ValueError('Expected at most {} input stream(s); got {}'.format(max_inputs, len(stream_map)))
raise ValueError(
'Expected at most {} input stream(s); got {}'.format(
max_inputs, len(stream_map)
)
)
@classmethod
def __check_input_types(cls, stream_map, incoming_stream_types):
for stream in list(stream_map.values()):
if not _is_of_types(stream, incoming_stream_types):
raise TypeError('Expected incoming stream(s) to be of one of the following types: {}; got {}'
.format(_get_types_str(incoming_stream_types), type(stream)))
raise TypeError(
'Expected incoming stream(s) to be of one of the following types: {}; got {}'.format(
_get_types_str(incoming_stream_types), type(stream)
)
)
@classmethod
def __get_incoming_edge_map(cls, stream_map):
incoming_edge_map = {}
for downstream_label, upstream in list(stream_map.items()):
incoming_edge_map[downstream_label] = (upstream.node, upstream.label, upstream.selector)
incoming_edge_map[downstream_label] = (
upstream.node,
upstream.label,
upstream.selector,
)
return incoming_edge_map
def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs,
max_inputs, args=[], kwargs={}):
def __init__(
self,
stream_spec,
name,
incoming_stream_types,
outgoing_stream_type,
min_inputs,
max_inputs,
args=[],
kwargs={},
):
stream_map = get_stream_map(stream_spec)
self.__check_input_len(stream_map, min_inputs, max_inputs)
self.__check_input_types(stream_map, incoming_stream_types)
@ -203,8 +234,9 @@ class Node(KwargReprNode):
class FilterableStream(Stream):
def __init__(self, upstream_node, upstream_label, upstream_selector=None):
super(FilterableStream, self).__init__(upstream_node, upstream_label, {InputNode, FilterNode},
upstream_selector)
super(FilterableStream, self).__init__(
upstream_node, upstream_label, {InputNode, FilterNode}, upstream_selector
)
# noinspection PyMethodOverriding
@ -220,7 +252,7 @@ class InputNode(Node):
min_inputs=0,
max_inputs=0,
args=args,
kwargs=kwargs
kwargs=kwargs,
)
@property
@ -239,7 +271,7 @@ class FilterNode(Node):
min_inputs=1,
max_inputs=max_inputs,
args=args,
kwargs=kwargs
kwargs=kwargs,
)
"""FilterNode"""
@ -279,7 +311,7 @@ class OutputNode(Node):
min_inputs=1,
max_inputs=None,
args=args,
kwargs=kwargs
kwargs=kwargs,
)
@property
@ -289,8 +321,12 @@ class OutputNode(Node):
class OutputStream(Stream):
def __init__(self, upstream_node, upstream_label, upstream_selector=None):
super(OutputStream, self).__init__(upstream_node, upstream_label, {OutputNode, GlobalNode, MergeOutputsNode},
upstream_selector=upstream_selector)
super(OutputStream, self).__init__(
upstream_node,
upstream_label,
{OutputNode, GlobalNode, MergeOutputsNode},
upstream_selector=upstream_selector,
)
# noinspection PyMethodOverriding
@ -302,7 +338,7 @@ class MergeOutputsNode(Node):
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=None
max_inputs=None,
)
@ -317,7 +353,7 @@ class GlobalNode(Node):
min_inputs=1,
max_inputs=1,
args=args,
kwargs=kwargs
kwargs=kwargs,
)
@ -338,6 +374,4 @@ def output_operator(name=None):
return stream_operator(stream_classes={OutputStream}, name=name)
__all__ = [
'Stream',
]
__all__ = ['Stream']

View File

@ -30,7 +30,10 @@ subprocess.check_call(['ffmpeg', '-version'])
def test_escape_chars():
assert ffmpeg._utils.escape_chars('a:b', ':') == 'a\:b'
assert ffmpeg._utils.escape_chars('a\\:b', ':\\') == 'a\\\\\\:b'
assert ffmpeg._utils.escape_chars('a:b,c[d]e%{}f\'g\'h\\i', '\\\':,[]%') == 'a\\:b\\,c\\[d\\]e\\%{}f\\\'g\\\'h\\\\i'
assert (
ffmpeg._utils.escape_chars('a:b,c[d]e%{}f\'g\'h\\i', '\\\':,[]%')
== 'a\\:b\\,c\\[d\\]e\\%{}f\\\'g\\\'h\\\\i'
)
assert ffmpeg._utils.escape_chars(123, ':\\') == '123'
@ -62,23 +65,16 @@ def test_fluent_concat():
def test_fluent_output():
(ffmpeg
.input('dummy.mp4')
.trim(start_frame=10, end_frame=20)
.output('dummy2.mp4')
)
ffmpeg.input('dummy.mp4').trim(start_frame=10, end_frame=20).output('dummy2.mp4')
def test_fluent_complex_filter():
in_file = ffmpeg.input('dummy.mp4')
return (ffmpeg
.concat(
in_file.trim(start_frame=10, end_frame=20),
in_file.trim(start_frame=30, end_frame=40),
in_file.trim(start_frame=50, end_frame=60)
)
.output('dummy2.mp4')
)
return ffmpeg.concat(
in_file.trim(start_frame=10, end_frame=20),
in_file.trim(start_frame=30, end_frame=40),
in_file.trim(start_frame=50, end_frame=60),
).output('dummy2.mp4')
def test_node_repr():
@ -88,21 +84,35 @@ def test_node_repr():
trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60)
concatted = ffmpeg.concat(trim1, trim2, trim3)
output = ffmpeg.output(concatted, 'dummy2.mp4')
assert repr(in_file.node) == 'input(filename={!r}) <{}>'.format('dummy.mp4', in_file.node.short_hash)
assert repr(trim1.node) == 'trim(end_frame=20, start_frame=10) <{}>'.format(trim1.node.short_hash)
assert repr(trim2.node) == 'trim(end_frame=40, start_frame=30) <{}>'.format(trim2.node.short_hash)
assert repr(trim3.node) == 'trim(end_frame=60, start_frame=50) <{}>'.format(trim3.node.short_hash)
assert repr(in_file.node) == 'input(filename={!r}) <{}>'.format(
'dummy.mp4', in_file.node.short_hash
)
assert repr(trim1.node) == 'trim(end_frame=20, start_frame=10) <{}>'.format(
trim1.node.short_hash
)
assert repr(trim2.node) == 'trim(end_frame=40, start_frame=30) <{}>'.format(
trim2.node.short_hash
)
assert repr(trim3.node) == 'trim(end_frame=60, start_frame=50) <{}>'.format(
trim3.node.short_hash
)
assert repr(concatted.node) == 'concat(n=3) <{}>'.format(concatted.node.short_hash)
assert repr(output.node) == 'output(filename={!r}) <{}>'.format('dummy2.mp4', output.node.short_hash)
assert repr(output.node) == 'output(filename={!r}) <{}>'.format(
'dummy2.mp4', output.node.short_hash
)
def test_stream_repr():
in_file = ffmpeg.input('dummy.mp4')
assert repr(in_file) == 'input(filename={!r})[None] <{}>'.format('dummy.mp4', in_file.node.short_hash)
assert repr(in_file) == 'input(filename={!r})[None] <{}>'.format(
'dummy.mp4', in_file.node.short_hash
)
split0 = in_file.filter_multi_output('split')[0]
assert repr(split0) == 'split()[0] <{}>'.format(split0.node.short_hash)
dummy_out = in_file.filter_multi_output('dummy')['out']
assert repr(dummy_out) == 'dummy()[{!r}] <{}>'.format(dummy_out.label, dummy_out.node.short_hash)
assert repr(dummy_out) == 'dummy()[{!r}] <{}>'.format(
dummy_out.label, dummy_out.node.short_hash
)
def test__get_args__simple():
@ -111,8 +121,18 @@ def test__get_args__simple():
def test_global_args():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4').global_args('-progress', 'someurl')
assert out_file.get_args() == ['-i', 'dummy.mp4', 'dummy2.mp4', '-progress', 'someurl']
out_file = (
ffmpeg.input('dummy.mp4')
.output('dummy2.mp4')
.global_args('-progress', 'someurl')
)
assert out_file.get_args() == [
'-i',
'dummy.mp4',
'dummy2.mp4',
'-progress',
'someurl',
]
def _get_simple_example():
@ -120,18 +140,14 @@ def _get_simple_example():
def _get_complex_filter_example():
split = (ffmpeg
.input(TEST_INPUT_FILE1)
.vflip()
.split()
)
split = ffmpeg.input(TEST_INPUT_FILE1).vflip().split()
split0 = split[0]
split1 = split[1]
overlay_file = ffmpeg.input(TEST_OVERLAY_FILE)
overlay_file = ffmpeg.crop(overlay_file, 10, 10, 158, 112)
return (ffmpeg
.concat(
return (
ffmpeg.concat(
split0.trim(start_frame=10, end_frame=20),
split1.trim(start_frame=30, end_frame=40),
)
@ -145,20 +161,25 @@ def _get_complex_filter_example():
def test__get_args__complex_filter():
out = _get_complex_filter_example()
args = ffmpeg.get_args(out)
assert args == ['-i', TEST_INPUT_FILE1,
'-i', TEST_OVERLAY_FILE,
assert args == [
'-i',
TEST_INPUT_FILE1,
'-i',
TEST_OVERLAY_FILE,
'-filter_complex',
'[0]vflip[s0];' \
'[s0]split=2[s1][s2];' \
'[s1]trim=end_frame=20:start_frame=10[s3];' \
'[s2]trim=end_frame=40:start_frame=30[s4];' \
'[s3][s4]concat=n=2[s5];' \
'[1]crop=158:112:10:10[s6];' \
'[s6]hflip[s7];' \
'[s5][s7]overlay=eof_action=repeat[s8];' \
'[s8]drawbox=50:50:120:120:red:t=5[s9]',
'-map', '[s9]', TEST_OUTPUT_FILE1,
'-y'
'[0]vflip[s0];'
'[s0]split=2[s1][s2];'
'[s1]trim=end_frame=20:start_frame=10[s3];'
'[s2]trim=end_frame=40:start_frame=30[s4];'
'[s3][s4]concat=n=2[s5];'
'[1]crop=158:112:10:10[s6];'
'[s6]hflip[s7];'
'[s5][s7]overlay=eof_action=repeat[s8];'
'[s8]drawbox=50:50:120:120:red:t=5[s9]',
'-map',
'[s9]',
TEST_OUTPUT_FILE1,
'-y',
]
@ -167,11 +188,15 @@ def test_combined_output():
i2 = ffmpeg.input(TEST_OVERLAY_FILE)
out = ffmpeg.output(i1, i2, TEST_OUTPUT_FILE1)
assert out.get_args() == [
'-i', TEST_INPUT_FILE1,
'-i', TEST_OVERLAY_FILE,
'-map', '0',
'-map', '1',
TEST_OUTPUT_FILE1
'-i',
TEST_INPUT_FILE1,
'-i',
TEST_OVERLAY_FILE,
'-map',
'0',
'-map',
'1',
TEST_OUTPUT_FILE1,
]
@ -186,16 +211,18 @@ def test_filter_with_selector(use_shorthand):
a1 = i['a'].filter('aecho', 0.8, 0.9, 1000, 0.3)
out = ffmpeg.output(a1, v1, TEST_OUTPUT_FILE1)
assert out.get_args() == [
'-i', TEST_INPUT_FILE1,
'-i',
TEST_INPUT_FILE1,
'-filter_complex',
'[0:a]aecho=0.8:0.9:1000:0.3[s0];' \
'[0:v]hflip[s1]',
'-map', '[s0]', '-map', '[s1]',
TEST_OUTPUT_FILE1
'[0:a]aecho=0.8:0.9:1000:0.3[s0];' '[0:v]hflip[s1]',
'-map',
'[s0]',
'-map',
'[s1]',
TEST_OUTPUT_FILE1,
]
def test_get_item_with_bad_selectors():
input = ffmpeg.input(TEST_INPUT_FILE1)
@ -213,16 +240,12 @@ def test_get_item_with_bad_selectors():
def _get_complex_filter_asplit_example():
split = (ffmpeg
.input(TEST_INPUT_FILE1)
.vflip()
.asplit()
)
split = ffmpeg.input(TEST_INPUT_FILE1).vflip().asplit()
split0 = split[0]
split1 = split[1]
return (ffmpeg
.concat(
return (
ffmpeg.concat(
split0.filter('atrim', start=10, end=20),
split1.filter('atrim', start=30, end=40),
)
@ -234,12 +257,7 @@ def _get_complex_filter_asplit_example():
def test_filter_concat__video_only():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
args = (
ffmpeg
.concat(in1, in2)
.output('out.mp4')
.get_args()
)
args = ffmpeg.concat(in1, in2).output('out.mp4').get_args()
assert args == [
'-i',
'in1.mp4',
@ -256,12 +274,7 @@ def test_filter_concat__video_only():
def test_filter_concat__audio_only():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
args = (
ffmpeg
.concat(in1, in2, v=0, a=1)
.output('out.mp4')
.get_args()
)
args = ffmpeg.concat(in1, in2, v=0, a=1).output('out.mp4').get_args()
assert args == [
'-i',
'in1.mp4',
@ -271,7 +284,7 @@ def test_filter_concat__audio_only():
'[0][1]concat=a=1:n=2:v=0[s0]',
'-map',
'[s0]',
'out.mp4'
'out.mp4',
]
@ -279,11 +292,7 @@ def test_filter_concat__audio_video():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
joined = ffmpeg.concat(in1.video, in1.audio, in2.hflip(), in2['a'], v=1, a=1).node
args = (
ffmpeg
.output(joined[0], joined[1], 'out.mp4')
.get_args()
)
args = ffmpeg.output(joined[0], joined[1], 'out.mp4').get_args()
assert args == [
'-i',
'in1.mp4',
@ -304,8 +313,10 @@ def test_filter_concat__wrong_stream_count():
in2 = ffmpeg.input('in2.mp4')
with pytest.raises(ValueError) as excinfo:
ffmpeg.concat(in1.video, in1.audio, in2.hflip(), v=1, a=1).node
assert str(excinfo.value) == \
'Expected concat input streams to have length multiple of 2 (v=1, a=1); got 3'
assert (
str(excinfo.value)
== 'Expected concat input streams to have length multiple of 2 (v=1, a=1); got 3'
)
def test_filter_asplit():
@ -320,14 +331,13 @@ def test_filter_asplit():
'-map',
'[s5]',
TEST_OUTPUT_FILE1,
'-y'
'-y',
]
def test__output__bitrate():
args = (
ffmpeg
.input('in')
ffmpeg.input('in')
.output('out', video_bitrate=1000, audio_bitrate=200)
.get_args()
)
@ -336,28 +346,26 @@ def test__output__bitrate():
@pytest.mark.parametrize('video_size', [(320, 240), '320x240'])
def test__output__video_size(video_size):
args = (
ffmpeg
.input('in')
.output('out', video_size=video_size)
.get_args()
)
args = ffmpeg.input('in').output('out', video_size=video_size).get_args()
assert args == ['-i', 'in', '-video_size', '320x240', 'out']
def test_filter_normal_arg_escape():
"""Test string escaping of normal filter args (e.g. ``font`` param of ``drawtext`` filter)."""
def _get_drawtext_font_repr(font):
"""Build a command-line arg using drawtext ``font`` param and extract the ``-filter_complex`` arg."""
args = (ffmpeg
.input('in')
args = (
ffmpeg.input('in')
.drawtext('test', font='a{}b'.format(font))
.output('out')
.get_args()
)
assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=font=a((.|\n)*)b:text=test\[s0\]', args[3], re.MULTILINE)
match = re.match(
r'\[0\]drawtext=font=a((.|\n)*)b:text=test\[s0\]', args[3], re.MULTILINE
)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1)
@ -381,14 +389,10 @@ def test_filter_normal_arg_escape():
def test_filter_text_arg_str_escape():
"""Test string escaping of normal filter args (e.g. ``text`` param of ``drawtext`` filter)."""
def _get_drawtext_text_repr(text):
"""Build a command-line arg using drawtext ``text`` param and extract the ``-filter_complex`` arg."""
args = (ffmpeg
.input('in')
.drawtext('a{}b'.format(text))
.output('out')
.get_args()
)
args = ffmpeg.input('in').drawtext('a{}b'.format(text)).output('out').get_args()
assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=text=a((.|\n)*)b\[s0\]', args[3], re.MULTILINE)
@ -413,14 +417,19 @@ def test_filter_text_arg_str_escape():
assert expected == actual
#def test_version():
# def test_version():
# subprocess.check_call(['ffmpeg', '-version'])
def test__compile():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4')
assert out_file.compile() == ['ffmpeg', '-i', 'dummy.mp4', 'dummy2.mp4']
assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4']
assert out_file.compile(cmd='ffmpeg.old') == [
'ffmpeg.old',
'-i',
'dummy.mp4',
'dummy2.mp4',
]
@pytest.mark.parametrize('pipe_stdin', [True, False])
@ -431,15 +440,18 @@ def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr):
popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock)
stream = _get_simple_example()
process = ffmpeg.run_async(
stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr)
stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr
)
assert process is process__mock
expected_stdin = subprocess.PIPE if pipe_stdin else None
expected_stdout = subprocess.PIPE if pipe_stdout else None
expected_stderr = subprocess.PIPE if pipe_stderr else None
(args,), kwargs = popen__mock.call_args
(args,), kwargs = popen__mock.call_args
assert args == ffmpeg.compile(stream)
assert kwargs == dict(stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr)
assert kwargs == dict(
stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr
)
def test__run():
@ -454,7 +466,9 @@ def test__run():
def test__run__capture_out(mocker, capture_stdout, capture_stderr):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['echo', 'test'])
stream = _get_simple_example()
out, err = ffmpeg.run(stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr)
out, err = ffmpeg.run(
stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr
)
if capture_stdout:
assert out == 'test\n'.encode()
else:
@ -479,7 +493,9 @@ def test__run__error(mocker, capture_stdout, capture_stderr):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['ffmpeg'])
stream = _get_complex_filter_example()
with pytest.raises(ffmpeg.Error) as excinfo:
out, err = ffmpeg.run(stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr)
out, err = ffmpeg.run(
stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr
)
assert str(excinfo.value) == 'ffmpeg error (see stderr output for detail)'
out = excinfo.value.stdout
err = excinfo.value.stderr
@ -515,24 +531,30 @@ def test__filter__custom():
stream = ffmpeg.filter(stream, 'custom_filter', 'a', 'b', kwarg1='c')
stream = ffmpeg.output(stream, 'dummy2.mp4')
assert stream.get_args() == [
'-i', 'dummy.mp4',
'-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]',
'-map', '[s0]',
'dummy2.mp4'
'-i',
'dummy.mp4',
'-filter_complex',
'[0]custom_filter=a:b:kwarg1=c[s0]',
'-map',
'[s0]',
'dummy2.mp4',
]
def test__filter__custom_fluent():
stream = (ffmpeg
.input('dummy.mp4')
stream = (
ffmpeg.input('dummy.mp4')
.filter('custom_filter', 'a', 'b', kwarg1='c')
.output('dummy2.mp4')
)
assert stream.get_args() == [
'-i', 'dummy.mp4',
'-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]',
'-map', '[s0]',
'dummy2.mp4'
'-i',
'dummy.mp4',
'-filter_complex',
'[0]custom_filter=a:b:kwarg1=c[s0]',
'-map',
'[s0]',
'dummy2.mp4',
]
@ -541,16 +563,29 @@ def test__merge_outputs():
out1 = in_.output('out1.mp4')
out2 = in_.output('out2.mp4')
assert ffmpeg.merge_outputs(out1, out2).get_args() == [
'-i', 'in.mp4', 'out1.mp4', 'out2.mp4'
]
assert ffmpeg.get_args([out1, out2]) == [
'-i', 'in.mp4', 'out2.mp4', 'out1.mp4'
'-i',
'in.mp4',
'out1.mp4',
'out2.mp4',
]
assert ffmpeg.get_args([out1, out2]) == ['-i', 'in.mp4', 'out2.mp4', 'out1.mp4']
def test__input__start_time():
assert ffmpeg.input('in', ss=10.5).output('out').get_args() == ['-ss', '10.5', '-i', 'in', 'out']
assert ffmpeg.input('in', ss=0.0).output('out').get_args() == ['-ss', '0.0', '-i', 'in', 'out']
assert ffmpeg.input('in', ss=10.5).output('out').get_args() == [
'-ss',
'10.5',
'-i',
'in',
'out',
]
assert ffmpeg.input('in', ss=0.0).output('out').get_args() == [
'-ss',
'0.0',
'-i',
'in',
'out',
]
def test_multi_passthrough():
@ -558,49 +593,53 @@ def test_multi_passthrough():
out2 = ffmpeg.input('in2.mp4').output('out2.mp4')
out = ffmpeg.merge_outputs(out1, out2)
assert ffmpeg.get_args(out) == [
'-i', 'in1.mp4',
'-i', 'in2.mp4',
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'out1.mp4',
'-map', '1',
'out2.mp4'
'-map',
'1',
'out2.mp4',
]
assert ffmpeg.get_args([out1, out2]) == [
'-i', 'in2.mp4',
'-i', 'in1.mp4',
'-i',
'in2.mp4',
'-i',
'in1.mp4',
'out2.mp4',
'-map', '1',
'out1.mp4'
'-map',
'1',
'out1.mp4',
]
def test_passthrough_selectors():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
args = (
ffmpeg
.output(i1['1'], i1['2'], TEST_OUTPUT_FILE1)
.get_args()
)
args = ffmpeg.output(i1['1'], i1['2'], TEST_OUTPUT_FILE1).get_args()
assert args == [
'-i', TEST_INPUT_FILE1,
'-map', '0:1',
'-map', '0:2',
'-i',
TEST_INPUT_FILE1,
'-map',
'0:1',
'-map',
'0:2',
TEST_OUTPUT_FILE1,
]
def test_mixed_passthrough_selectors():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
args = (
ffmpeg
.output(i1['1'].hflip(), i1['2'], TEST_OUTPUT_FILE1)
.get_args()
)
args = ffmpeg.output(i1['1'].hflip(), i1['2'], TEST_OUTPUT_FILE1).get_args()
assert args == [
'-i', TEST_INPUT_FILE1,
'-i',
TEST_INPUT_FILE1,
'-filter_complex',
'[0:1]hflip[s0]',
'-map', '[s0]',
'-map', '0:2',
'-map',
'[s0]',
'-map',
'0:2',
TEST_OUTPUT_FILE1,
]
@ -612,36 +651,53 @@ def test_pipe():
frame_count = 10
start_frame = 2
out = (ffmpeg
.input('pipe:0', format='rawvideo', pixel_format='rgb24', video_size=(width, height), framerate=10)
out = (
ffmpeg.input(
'pipe:0',
format='rawvideo',
pixel_format='rgb24',
video_size=(width, height),
framerate=10,
)
.trim(start_frame=start_frame)
.output('pipe:1', format='rawvideo')
)
args = out.get_args()
assert args == [
'-f', 'rawvideo',
'-video_size', '{}x{}'.format(width, height),
'-framerate', '10',
'-pixel_format', 'rgb24',
'-i', 'pipe:0',
'-f',
'rawvideo',
'-video_size',
'{}x{}'.format(width, height),
'-framerate',
'10',
'-pixel_format',
'rgb24',
'-i',
'pipe:0',
'-filter_complex',
'[0]trim=start_frame=2[s0]',
'-map', '[s0]',
'-f', 'rawvideo',
'pipe:1'
'[0]trim=start_frame=2[s0]',
'-map',
'[s0]',
'-f',
'rawvideo',
'pipe:1',
]
cmd = ['ffmpeg'] + args
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
in_data = bytes(bytearray([random.randint(0,255) for _ in range(frame_size * frame_count)]))
in_data = bytes(
bytearray([random.randint(0, 255) for _ in range(frame_size * frame_count)])
)
p.stdin.write(in_data) # note: this could block, in which case need to use threads
p.stdin.close()
out_data = p.stdout.read()
assert len(out_data) == frame_size * (frame_count - start_frame)
assert out_data == in_data[start_frame*frame_size:]
assert out_data == in_data[start_frame * frame_size :]
def test__probe():

View File

@ -13,6 +13,7 @@ importlib-metadata==0.17
Jinja2==2.10.1
MarkupSafe==1.1.1
more-itertools==7.0.0
numpy==1.16.4
packaging==19.0
pluggy==0.12.0
py==1.8.0

View File

@ -2,22 +2,26 @@ from setuptools import setup
from textwrap import dedent
version = '0.1.17'
download_url = 'https://github.com/kkroening/ffmpeg-python/archive/v{}.zip'.format(version)
download_url = 'https://github.com/kkroening/ffmpeg-python/archive/v{}.zip'.format(
version
)
long_description = dedent("""\
long_description = dedent(
'''\
ffmpeg-python: Python bindings for FFmpeg
=========================================
:Github: https://github.com/kkroening/ffmpeg-python
:API Reference: https://kkroening.github.io/ffmpeg-python/
""")
'''
)
file_formats = [
'aac',
'ac3',
'avi',
'bmp'
'bmp',
'flac',
'gif',
'mov',
@ -70,11 +74,12 @@ setup(
extras_require={
'dev': [
'future==0.17.1',
'numpy==1.16.4',
'pytest-mock==1.10.4',
'pytest==4.6.1',
'Sphinx==2.1.0',
'tox==3.12.1',
],
]
},
classifiers=[
'Intended Audience :: Developers',