From fc946be164712e67dfe7f4da10ab637d75f6958c Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Tue, 4 Jul 2017 17:45:07 -0600 Subject: [PATCH 01/15] Pull in _NodeBase from actorgraph; include short-hash in repr --- ffmpeg/nodes.py | 86 +++++++++++++++++++++++++++++++------ ffmpeg/tests/test_ffmpeg.py | 12 +++--- 2 files changed, 80 insertions(+), 18 deletions(-) diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index 11bec59..fe4b130 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -1,25 +1,72 @@ from __future__ import unicode_literals from builtins import object +import copy import hashlib -import json -class Node(object): - """Node base""" - def __init__(self, parents, name, *args, **kwargs): +def _recursive_repr(item): + """Hack around python `repr` to deterministically represent dictionaries. + + This is able to represent more things than json.dumps, since it does not require things to be JSON serializable + (e.g. datetimes). + """ + if isinstance(item, basestring): + result = str(item) + elif isinstance(item, list): + result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item])) + elif isinstance(item, dict): + kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)] + result = '{' + ', '.join(kv_pairs) + '}' + else: + result = repr(item) + return result + + +def _create_hash(item): + hasher = hashlib.sha224() + repr_ = _recursive_repr(item) + hasher.update(repr_.encode('utf-8')) + return hasher.hexdigest() + + +class _NodeBase(object): + @property + def hash(self): + if self._hash is None: + self._update_hash() + return self._hash + + def __init__(self, parents, name): parent_hashes = [hash(parent) for parent in parents] assert len(parent_hashes) == len(set(parent_hashes)), 'Same node cannot be included as parent multiple times' self._parents = parents self._hash = None self._name = name - self._args = args - self._kwargs = kwargs + + def _transplant(self, new_parents): + other = copy.copy(self) + other._parents = copy.copy(new_parents) + return other + + @property + def _repr_args(self): + raise NotImplementedError() + + @property + def _repr_kwargs(self): + raise NotImplementedError() + + @property + def _short_hash(self): + return '{:x}'.format(abs(hash(self)))[:12] def __repr__(self): - formatted_props = ['{}'.format(arg) for arg in self._args] - formatted_props += ['{}={!r}'.format(key, self._kwargs[key]) for key in sorted(self._kwargs)] - return '{}({})'.format(self._name, ','.join(formatted_props)) + args = self._repr_args + kwargs = self._repr_kwargs + formatted_props = ['{!r}'.format(arg) for arg in args] + formatted_props += ['{}={!r}'.format(key, kwargs[key]) for key in sorted(kwargs)] + return '{}({}) <{}>'.format(self._name, ', '.join(formatted_props), self._short_hash) def __hash__(self): if self._hash is None: @@ -30,9 +77,8 @@ class Node(object): return hash(self) == hash(other) def _update_hash(self): - props = {'args': self._args, 'kwargs': self._kwargs} - props_str = json.dumps(props, sort_keys=True).encode('utf-8') - my_hash = hashlib.md5(props_str).hexdigest() + props = {'args': self._repr_args, 'kwargs': self._repr_kwargs} + my_hash = _create_hash(props) parent_hashes = [str(hash(parent)) for parent in self._parents] hashes = parent_hashes + [my_hash] hashes_str = ','.join(hashes).encode('utf-8') @@ -40,6 +86,22 @@ class Node(object): self._hash = int(hash_str, base=16) +class Node(_NodeBase): + """Node base""" + def __init__(self, parents, name, *args, **kwargs): + super(Node, self).__init__(parents, name) + self._args = args + self._kwargs = kwargs + + @property + def _repr_args(self): + return self._args + + @property + def _repr_kwargs(self): + return self._kwargs + + class InputNode(Node): """InputNode type""" def __init__(self, name, *args, **kwargs): diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index f2c0521..7d80ff6 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -73,12 +73,12 @@ def test_repr(): trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60) concatted = ffmpeg.concat(trim1, trim2, trim3) output = ffmpeg.output(concatted, 'dummy2.mp4') - assert repr(in_file) == "input(filename={!r})".format('dummy.mp4') - assert repr(trim1) == "trim(end_frame=20,start_frame=10)" - assert repr(trim2) == "trim(end_frame=40,start_frame=30)" - assert repr(trim3) == "trim(end_frame=60,start_frame=50)" - assert repr(concatted) == "concat(n=3)" - assert repr(output) == "output(filename={!r})".format('dummy2.mp4') + assert repr(in_file) == "input(filename={!r}) <{}>".format('dummy.mp4', in_file._short_hash) + assert repr(trim1) == "trim(end_frame=20, start_frame=10) <{}>".format(trim1._short_hash) + assert repr(trim2) == "trim(end_frame=40, start_frame=30) <{}>".format(trim2._short_hash) + assert repr(trim3) == "trim(end_frame=60, start_frame=50) <{}>".format(trim3._short_hash) + assert repr(concatted) == "concat(n=3) <{}>".format(concatted._short_hash) + assert repr(output) == "output(filename={!r}) <{}>".format('dummy2.mp4', output._short_hash) def test_get_args_simple(): From 6a9a12e7183ae87de2ef1580a1fe72ea24b38ef0 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Wed, 5 Jul 2017 03:13:30 -0600 Subject: [PATCH 02/15] #17: move graph stuff to dag.py; add edge labelling --- ffmpeg/_run.py | 36 ++------ ffmpeg/dag.py | 178 ++++++++++++++++++++++++++++++++++++ ffmpeg/nodes.py | 111 +++------------------- ffmpeg/tests/test_ffmpeg.py | 12 +-- 4 files changed, 206 insertions(+), 131 deletions(-) create mode 100644 ffmpeg/dag.py diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 2f99a3f..ae055d6 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +from .dag import topo_sort from functools import reduce from past.builtins import basestring import copy @@ -34,8 +35,8 @@ def _convert_kwargs_to_cmd_line_args(kwargs): def _get_input_args(input_node): - if input_node._name == input.__name__: - kwargs = copy.copy(input_node._kwargs) + if input_node.name == input.__name__: + kwargs = copy.copy(input_node.kwargs) filename = kwargs.pop('filename') fmt = kwargs.pop('format', None) video_size = kwargs.pop('video_size', None) @@ -51,27 +52,6 @@ def _get_input_args(input_node): return args -def _topo_sort(start_node): - marked_nodes = [] - sorted_nodes = [] - child_map = {} - def visit(node, child): - assert node not in marked_nodes, 'Graph is not a DAG' - if child is not None: - if node not in child_map: - child_map[node] = [] - child_map[node].append(child) - if node not in sorted_nodes: - marked_nodes.append(node) - [visit(parent, node) for parent in node._parents] - marked_nodes.remove(node) - sorted_nodes.append(node) - unmarked_nodes = [start_node] - while unmarked_nodes: - visit(unmarked_nodes.pop(), None) - return sorted_nodes, child_map - - def _get_filter_spec(i, node, stream_name_map): stream_name = _get_stream_name('v{}'.format(i)) stream_name_map[node] = stream_name @@ -86,7 +66,7 @@ def _get_filter_arg(filter_nodes, stream_name_map): def _get_global_args(node): - if node._name == overwrite_output.__name__: + if node.name == overwrite_output.__name__: return ['-y'] else: assert False, 'Unsupported global node: {}'.format(node) @@ -94,12 +74,12 @@ def _get_global_args(node): def _get_output_args(node, stream_name_map): args = [] - if node._name != merge_outputs.__name__: + if node.name != merge_outputs.__name__: stream_name = stream_name_map[node._parents[0]] if stream_name != '[0]': args += ['-map', stream_name] - if node._name == output.__name__: - kwargs = copy.copy(node._kwargs) + if node.name == output.__name__: + kwargs = copy.copy(node.kwargs) filename = kwargs.pop('filename') fmt = kwargs.pop('format', None) if fmt: @@ -116,7 +96,7 @@ def get_args(node): """Get command-line arguments for ffmpeg.""" args = [] # TODO: group nodes together, e.g. `-i somefile -r somerate`. - sorted_nodes, child_map = _topo_sort(node) + sorted_nodes, child_map = topo_sort([node]) del(node) input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)] output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode) and not diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py new file mode 100644 index 0000000..b0c489a --- /dev/null +++ b/ffmpeg/dag.py @@ -0,0 +1,178 @@ +from builtins import object +from collections import namedtuple +import copy +import hashlib + + +def _recursive_repr(item): + """Hack around python `repr` to deterministically represent dictionaries. + + This is able to represent more things than json.dumps, since it does not require things to be JSON serializable + (e.g. datetimes). + """ + if isinstance(item, basestring): + result = str(item) + elif isinstance(item, list): + result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item])) + elif isinstance(item, dict): + kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)] + result = '{' + ', '.join(kv_pairs) + '}' + else: + result = repr(item) + return result + + +def _get_hash(item): + hasher = hashlib.sha224() + repr_ = _recursive_repr(item) + hasher.update(repr_.encode('utf-8')) + return hasher.hexdigest() + + +class DagNode(object): + """Node in a directed-acyclic graph (DAG). + + Edges: + DagNodes are connected by edges. An edge connects two nodes with a label for each side: + - ``upstream_node``: upstream/parent node + - ``upstream_label``: label on the outgoing side of the upstream node + - ``downstream_node``: downstream/child node + - ``downstream_label``: label on the incoming side of the downstream node + + For example, DagNode A may be connected to DagNode B with an edge labelled "foo" on A's side, and "bar" on B's + side: + + _____ _____ + | | | | + | A >[foo]---[bar]> B | + |_____| |_____| + + Edge labels may be integers or strings, and nodes cannot have more than one incoming edge with the same label. + + DagNodes may have any number of incoming edges and any number of outgoing edges. DagNodes keep track only of + their incoming edges, but the entire graph structure can be inferred by looking at the furthest downstream + nodes and working backwards. + + Hashing: + DagNodes must be hashable, and two nodes are considered to be equivalent if they have the same hash value. + + Nodes are immutable, and the hash should remain constant as a result. If a node with new contents is required, + create a new node and throw the old one away. + + String representation: + In order for graph visualization tools to show useful information, nodes must be representable as strings. The + ``repr`` operator should provide a more or less "full" representation of the node, and the ``short_repr`` + property should be a shortened, concise representation. + + Again, because nodes are immutable, the string representations should remain constant. + """ + def __hash__(self): + """Return an integer hash of the node.""" + raise NotImplementedError() + + def __eq__(self, other): + """Compare two nodes; implementations should return True if (and only if) hashes match.""" + raise NotImplementedError() + + def __repr__(self, other): + """Return a full string representation of the node.""" + raise NotImplementedError() + + @property + def short_repr(self): + """Return a partial/concise representation of the node.""" + raise NotImplementedError() + + @property + def incoming_edge_map(self): + """Provides information about all incoming edges that connect to this node. + + The edge map is a dictionary that maps an ``incoming_label`` to ``(outgoing_node, outgoing_label)``. Note that + implicity, ``incoming_node`` is ``self``. See "Edges" section above. + """ + raise NotImplementedError() + + +DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstream_node', 'upstream_label']) + + +class KwargReprNode(DagNode): + """A DagNode that can be represented as a set of args+kwargs. + """ + def __get_hash(self): + hashes = self.__upstream_hashes + [self.__inner_hash] + hash_strs = [str(x) for x in hashes] + hashes_str = ','.join(hash_strs).encode('utf-8') + hash_str = hashlib.md5(hashes_str).hexdigest() + return int(hash_str, base=16) + + def __init__(self, incoming_edge_map, name, args, kwargs): + self.__incoming_edge_map = incoming_edge_map + self.name = name + self.args = args + self.kwargs = kwargs + self.__hash = self.__get_hash() + + @property + def __upstream_hashes(self): + hashes = [] + for downstream_label, (upstream_node, upstream_label) in self.incoming_edge_map.items(): + hashes += [hash(x) for x in [downstream_label, upstream_node, upstream_label]] + return hashes + + @property + def __inner_hash(self): + props = {'args': self.args, 'kwargs': self.kwargs} + return _get_hash(props) + + def __hash__(self): + return self.__hash + + def __eq__(self, other): + return hash(self) == hash(other) + + @property + def short_hash(self): + return '{:x}'.format(abs(hash(self)))[:12] + + def __repr__(self): + formatted_props = ['{!r}'.format(arg) for arg in self.args] + formatted_props += ['{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)] + return '{}({}) <{}>'.format(self.name, ', '.join(formatted_props), self.short_hash) + + @property + def incoming_edges(self): + edges = [] + for downstream_label, (upstream_node, upstream_label) in self.incoming_edge_map.items(): + downstream_node = self + edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] + return edges + + @property + def incoming_edge_map(self): + return self.__incoming_edge_map + + @property + def short_repr(self): + return self.name + + +def topo_sort(start_nodes): + marked_nodes = [] + sorted_nodes = [] + child_map = {} + def visit(node, child): + assert node not in marked_nodes, 'Graph is not a DAG' + if child is not None: + if node not in child_map: + child_map[node] = [] + child_map[node].append(child) + if node not in sorted_nodes: + marked_nodes.append(node) + [visit(parent, node) for parent in node._parents] + marked_nodes.remove(node) + sorted_nodes.append(node) + unmarked_nodes = list(copy.copy(start_nodes)) + while unmarked_nodes: + visit(unmarked_nodes.pop(), None) + return sorted_nodes, child_map diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index fe4b130..bf95e08 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -1,105 +1,22 @@ from __future__ import unicode_literals -from builtins import object -import copy -import hashlib +from .dag import KwargReprNode -def _recursive_repr(item): - """Hack around python `repr` to deterministically represent dictionaries. - - This is able to represent more things than json.dumps, since it does not require things to be JSON serializable - (e.g. datetimes). - """ - if isinstance(item, basestring): - result = str(item) - elif isinstance(item, list): - result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item])) - elif isinstance(item, dict): - kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)] - result = '{' + ', '.join(kv_pairs) + '}' - else: - result = repr(item) - return result - - -def _create_hash(item): - hasher = hashlib.sha224() - repr_ = _recursive_repr(item) - hasher.update(repr_.encode('utf-8')) - return hasher.hexdigest() - - -class _NodeBase(object): - @property - def hash(self): - if self._hash is None: - self._update_hash() - return self._hash - - def __init__(self, parents, name): - parent_hashes = [hash(parent) for parent in parents] - assert len(parent_hashes) == len(set(parent_hashes)), 'Same node cannot be included as parent multiple times' - self._parents = parents - self._hash = None - self._name = name - - def _transplant(self, new_parents): - other = copy.copy(self) - other._parents = copy.copy(new_parents) - return other - - @property - def _repr_args(self): - raise NotImplementedError() - - @property - def _repr_kwargs(self): - raise NotImplementedError() - - @property - def _short_hash(self): - return '{:x}'.format(abs(hash(self)))[:12] - - def __repr__(self): - args = self._repr_args - kwargs = self._repr_kwargs - formatted_props = ['{!r}'.format(arg) for arg in args] - formatted_props += ['{}={!r}'.format(key, kwargs[key]) for key in sorted(kwargs)] - return '{}({}) <{}>'.format(self._name, ', '.join(formatted_props), self._short_hash) - - def __hash__(self): - if self._hash is None: - self._update_hash() - return self._hash - - def __eq__(self, other): - return hash(self) == hash(other) - - def _update_hash(self): - props = {'args': self._repr_args, 'kwargs': self._repr_kwargs} - my_hash = _create_hash(props) - parent_hashes = [str(hash(parent)) for parent in self._parents] - hashes = parent_hashes + [my_hash] - hashes_str = ','.join(hashes).encode('utf-8') - hash_str = hashlib.md5(hashes_str).hexdigest() - self._hash = int(hash_str, base=16) - - -class Node(_NodeBase): +class Node(KwargReprNode): """Node base""" def __init__(self, parents, name, *args, **kwargs): - super(Node, self).__init__(parents, name) - self._args = args - self._kwargs = kwargs + incoming_edge_map = {} + for downstream_label, parent in enumerate(parents): + upstream_label = 0 # assume nodes have a single output (FIXME) + upstream_node = parent + incoming_edge_map[downstream_label] = (upstream_node, upstream_label) + super(Node, self).__init__(incoming_edge_map, name, args, kwargs) @property - def _repr_args(self): - return self._args - - @property - def _repr_kwargs(self): - return self._kwargs + def _parents(self): + # TODO: change graph compilation to use `self.incoming_edges` instead. + return [edge.upstream_node for edge in self.incoming_edges] class InputNode(Node): @@ -111,9 +28,9 @@ class InputNode(Node): class FilterNode(Node): """FilterNode""" def _get_filter(self): - params_text = self._name - arg_params = ['{}'.format(arg) for arg in self._args] - kwarg_params = ['{}={}'.format(k, self._kwargs[k]) for k in sorted(self._kwargs)] + params_text = self.name + arg_params = ['{}'.format(arg) for arg in self.args] + kwarg_params = ['{}={}'.format(k, self.kwargs[k]) for k in sorted(self.kwargs)] params = arg_params + kwarg_params if params: params_text += '={}'.format(':'.join(params)) diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index 7d80ff6..190e2f9 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -73,12 +73,12 @@ def test_repr(): trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60) concatted = ffmpeg.concat(trim1, trim2, trim3) output = ffmpeg.output(concatted, 'dummy2.mp4') - assert repr(in_file) == "input(filename={!r}) <{}>".format('dummy.mp4', in_file._short_hash) - assert repr(trim1) == "trim(end_frame=20, start_frame=10) <{}>".format(trim1._short_hash) - assert repr(trim2) == "trim(end_frame=40, start_frame=30) <{}>".format(trim2._short_hash) - assert repr(trim3) == "trim(end_frame=60, start_frame=50) <{}>".format(trim3._short_hash) - assert repr(concatted) == "concat(n=3) <{}>".format(concatted._short_hash) - assert repr(output) == "output(filename={!r}) <{}>".format('dummy2.mp4', output._short_hash) + assert repr(in_file) == "input(filename={!r}) <{}>".format('dummy.mp4', in_file.short_hash) + assert repr(trim1) == "trim(end_frame=20, start_frame=10) <{}>".format(trim1.short_hash) + assert repr(trim2) == "trim(end_frame=40, start_frame=30) <{}>".format(trim2.short_hash) + assert repr(trim3) == "trim(end_frame=60, start_frame=50) <{}>".format(trim3.short_hash) + assert repr(concatted) == "concat(n=3) <{}>".format(concatted.short_hash) + assert repr(output) == "output(filename={!r}) <{}>".format('dummy2.mp4', output.short_hash) def test_get_args_simple(): From 7236984626233ca0a1320d836a0c8fff65849955 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Wed, 5 Jul 2017 03:31:29 -0600 Subject: [PATCH 03/15] #17: don't rely on in --- ffmpeg/dag.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py index 1caeca3..181ec8e 100644 --- a/ffmpeg/dag.py +++ b/ffmpeg/dag.py @@ -170,7 +170,8 @@ def topo_sort(start_nodes): child_map[node].append(child) if node not in sorted_nodes: marked_nodes.append(node) - [visit(parent, node) for parent in node._parents] + parents = [edge.upstream_node for edge in node.incoming_edges] + [visit(parent, node) for parent in parents] marked_nodes.remove(node) sorted_nodes.append(node) unmarked_nodes = list(copy.copy(start_nodes)) From fc07f6c4fad4fab905eabb3a0106a78b2bd4c82e Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Wed, 5 Jul 2017 04:07:30 -0600 Subject: [PATCH 04/15] #17: remove `Node._parents` --- ffmpeg/_run.py | 7 +++--- ffmpeg/dag.py | 59 +++++++++++++++++++++++++++++++------------------ ffmpeg/nodes.py | 5 ----- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 533160e..09a3ae5 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -55,7 +55,7 @@ def _get_input_args(input_node): def _get_filter_spec(i, node, stream_name_map): stream_name = _get_stream_name('v{}'.format(i)) stream_name_map[node] = stream_name - inputs = [stream_name_map[parent] for parent in node._parents] + inputs = [stream_name_map[edge.upstream_node] for edge in node.incoming_edges] filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(), stream_name) return filter_spec @@ -75,7 +75,8 @@ def _get_global_args(node): def _get_output_args(node, stream_name_map): args = [] if node.name != merge_outputs.__name__: - stream_name = stream_name_map[node._parents[0]] + assert len(node.incoming_edges) == 1 + stream_name = stream_name_map[node.incoming_edges[0].upstream_node] if stream_name != '[0]': args += ['-map', stream_name] if node.name == output.__name__: @@ -96,7 +97,7 @@ def get_args(node): """Get command-line arguments for ffmpeg.""" args = [] # TODO: group nodes together, e.g. `-i somefile -r somerate`. - sorted_nodes, child_map = topo_sort([node]) + sorted_nodes, outgoing_edge_maps = topo_sort([node]) del(node) input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)] output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode) and not diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py index 181ec8e..25e8d21 100644 --- a/ffmpeg/dag.py +++ b/ffmpeg/dag.py @@ -96,6 +96,20 @@ class DagNode(object): DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstream_node', 'upstream_label']) +def get_incoming_edges(downstream_node, incoming_edge_map): + edges = [] + for downstream_label, (upstream_node, upstream_label) in incoming_edge_map.items(): + edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] + return edges + + +def get_outgoing_edges(upstream_node, outgoing_edge_map): + edges = [] + for upstream_label, (downstream_node, downstream_label) in outgoing_edge_map: + edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] + return edges + + class KwargReprNode(DagNode): """A DagNode that can be represented as a set of args+kwargs. """ @@ -142,11 +156,7 @@ class KwargReprNode(DagNode): @property def incoming_edges(self): - edges = [] - for downstream_label, (upstream_node, upstream_label) in self.incoming_edge_map.items(): - downstream_node = self - edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] - return edges + return get_incoming_edges(self, self.incoming_edge_map) @property def incoming_edge_map(self): @@ -157,24 +167,29 @@ class KwargReprNode(DagNode): return self.name -def topo_sort(start_nodes): +def topo_sort(downstream_nodes): marked_nodes = [] sorted_nodes = [] - child_map = {} - def visit(node, child): - if node in marked_nodes: + outgoing_edge_maps = {} + + def visit(upstream_node, upstream_label, downstream_node, downstream_label): + if upstream_node in marked_nodes: raise RuntimeError('Graph is not a DAG') - if child is not None: - if node not in child_map: - child_map[node] = [] - child_map[node].append(child) - if node not in sorted_nodes: - marked_nodes.append(node) - parents = [edge.upstream_node for edge in node.incoming_edges] - [visit(parent, node) for parent in parents] - marked_nodes.remove(node) - sorted_nodes.append(node) - unmarked_nodes = list(copy.copy(start_nodes)) + + if downstream_node is not None: + if upstream_node not in outgoing_edge_maps: + outgoing_edge_maps[upstream_node] = {} + outgoing_edge_maps[upstream_node][upstream_label] = (downstream_node, downstream_label) + + if upstream_node not in sorted_nodes: + marked_nodes.append(upstream_node) + for edge in upstream_node.incoming_edges: + visit(edge.upstream_node, edge.upstream_label, edge.downstream_node, edge.downstream_label) + marked_nodes.remove(upstream_node) + sorted_nodes.append(upstream_node) + + unmarked_nodes = [(node, 0) for node in downstream_nodes] while unmarked_nodes: - visit(unmarked_nodes.pop(), None) - return sorted_nodes, child_map + upstream_node, upstream_label = unmarked_nodes.pop() + visit(upstream_node, upstream_label, None, None) + return sorted_nodes, outgoing_edge_maps diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index 0d00437..67415f8 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -13,11 +13,6 @@ class Node(KwargReprNode): incoming_edge_map[downstream_label] = (upstream_node, upstream_label) super(Node, self).__init__(incoming_edge_map, name, args, kwargs) - @property - def _parents(self): - # TODO: change graph compilation to use `self.incoming_edges` instead. - return [edge.upstream_node for edge in self.incoming_edges] - class InputNode(Node): """InputNode type""" From 11a24d043249c1dacdca5bcde6faa74116d066e3 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Wed, 5 Jul 2017 04:23:05 -0600 Subject: [PATCH 05/15] #17: fix `get_outgoing_edges` --- ffmpeg/dag.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py index 25e8d21..f432d84 100644 --- a/ffmpeg/dag.py +++ b/ffmpeg/dag.py @@ -1,6 +1,5 @@ from builtins import object from collections import namedtuple -import copy import hashlib @@ -105,7 +104,7 @@ def get_incoming_edges(downstream_node, incoming_edge_map): def get_outgoing_edges(upstream_node, outgoing_edge_map): edges = [] - for upstream_label, (downstream_node, downstream_label) in outgoing_edge_map: + for upstream_label, (downstream_node, downstream_label) in outgoing_edge_map.items(): edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] return edges From aa5156d9c9e41f5dc844a9dbd97b76f981eda755 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Wed, 5 Jul 2017 04:52:37 -0600 Subject: [PATCH 06/15] #17: allow multiple outgoing edges with same label --- ffmpeg/dag.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py index f432d84..f8baa9f 100644 --- a/ffmpeg/dag.py +++ b/ffmpeg/dag.py @@ -104,8 +104,9 @@ def get_incoming_edges(downstream_node, incoming_edge_map): def get_outgoing_edges(upstream_node, outgoing_edge_map): edges = [] - for upstream_label, (downstream_node, downstream_label) in outgoing_edge_map.items(): - edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] + for upstream_label, downstream_infos in outgoing_edge_map.items(): + for (downstream_node, downstream_label) in downstream_infos: + edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] return edges @@ -176,9 +177,11 @@ def topo_sort(downstream_nodes): raise RuntimeError('Graph is not a DAG') if downstream_node is not None: - if upstream_node not in outgoing_edge_maps: - outgoing_edge_maps[upstream_node] = {} - outgoing_edge_maps[upstream_node][upstream_label] = (downstream_node, downstream_label) + outgoing_edge_map = outgoing_edge_maps.get(upstream_node, {}) + outgoing_edge_infos = outgoing_edge_map.get(upstream_label, []) + outgoing_edge_infos += [(downstream_node, downstream_label)] + outgoing_edge_map[upstream_label] = outgoing_edge_infos + outgoing_edge_maps[upstream_node] = outgoing_edge_map if upstream_node not in sorted_nodes: marked_nodes.append(upstream_node) From 6887ad8bac178483b7debcefa1c52de979ec0085 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Thu, 6 Jul 2017 02:23:13 -0600 Subject: [PATCH 07/15] Massive refactor to break nodes into streams+nodes --- ffmpeg/__init__.py | 3 +- ffmpeg/_ffmpeg.py | 25 ++--- ffmpeg/_filters.py | 122 +++++++++++------------- ffmpeg/_run.py | 52 +++++------ ffmpeg/_utils.py | 27 ++++++ ffmpeg/dag.py | 66 +++++-------- ffmpeg/nodes.py | 179 ++++++++++++++++++++++++++++++++---- ffmpeg/tests/test_ffmpeg.py | 28 +++--- 8 files changed, 326 insertions(+), 176 deletions(-) create mode 100644 ffmpeg/_utils.py diff --git a/ffmpeg/__init__.py b/ffmpeg/__init__.py index 1953609..9f1bed7 100644 --- a/ffmpeg/__init__.py +++ b/ffmpeg/__init__.py @@ -3,4 +3,5 @@ from . import _filters, _ffmpeg, _run from ._filters import * from ._ffmpeg import * from ._run import * -__all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ +from ._view import * +__all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ + _view.__all__ diff --git a/ffmpeg/_ffmpeg.py b/ffmpeg/_ffmpeg.py index 9cb2ce7..8dc6c87 100644 --- a/ffmpeg/_ffmpeg.py +++ b/ffmpeg/_ffmpeg.py @@ -1,10 +1,11 @@ from __future__ import unicode_literals from .nodes import ( - FilterNode, + filter_operator, GlobalNode, InputNode, - operator, + MergeOutputsNode, OutputNode, + output_operator, ) @@ -19,27 +20,27 @@ def input(filename, **kwargs): if 'format' in kwargs: raise ValueError("Can't specify both `format` and `f` kwargs") kwargs['format'] = fmt - return InputNode(input.__name__, **kwargs) + return InputNode(input.__name__, kwargs=kwargs).stream() -@operator(node_classes={OutputNode, GlobalNode}) -def overwrite_output(parent_node): +@output_operator() +def overwrite_output(stream): """Overwrite output files without asking (ffmpeg ``-y`` option) Official documentation: `Main options `__ """ - return GlobalNode(parent_node, overwrite_output.__name__) + return GlobalNode(stream, overwrite_output.__name__).stream() -@operator(node_classes={OutputNode}) -def merge_outputs(*parent_nodes): +@output_operator() +def merge_outputs(*streams): """Include all given outputs in one ffmpeg command line """ - return OutputNode(parent_nodes, merge_outputs.__name__) + return MergeOutputsNode(streams, merge_outputs.__name__).stream() -@operator(node_classes={InputNode, FilterNode}) -def output(parent_node, filename, **kwargs): +@filter_operator() +def output(stream, filename, **kwargs): """Output file URL Official documentation: `Synopsis `__ @@ -50,7 +51,7 @@ def output(parent_node, filename, **kwargs): if 'format' in kwargs: raise ValueError("Can't specify both `format` and `f` kwargs") kwargs['format'] = fmt - return OutputNode([parent_node], output.__name__, **kwargs) + return OutputNode(stream, output.__name__, kwargs=kwargs).stream() diff --git a/ffmpeg/_filters.py b/ffmpeg/_filters.py index 6329648..2883873 100644 --- a/ffmpeg/_filters.py +++ b/ffmpeg/_filters.py @@ -1,64 +1,55 @@ from __future__ import unicode_literals from .nodes import ( FilterNode, - operator, + filter_operator, ) -@operator() -def filter_(parent_node, filter_name, *args, **kwargs): - """Apply custom single-source filter. +@filter_operator() +def filter_multi_output(stream_spec, filter_name, *args, **kwargs): + """Apply custom filter with one or more outputs. + + This is the same as ``filter_`` except that the filter can produce more than one output. + + To reference an output stream, use either the ``.stream`` operator or bracket shorthand: + + Example: + + ``` + split = ffmpeg.input('in.mp4').filter_multi_output('split') + split0 = split.stream(0) + split1 = split[1] + ffmpeg.concat(split0, split1).output('out.mp4').run() + ``` + """ + return FilterNode(stream_spec, filter_name, args=args, kwargs=kwargs, max_inputs=None) + + +@filter_operator() +def filter_(stream_spec, filter_name, *args, **kwargs): + """Apply custom filter. ``filter_`` is normally used by higher-level filter functions such as ``hflip``, but if a filter implementation is missing from ``fmpeg-python``, you can call ``filter_`` directly to have ``fmpeg-python`` pass the filter name and arguments to ffmpeg verbatim. Args: - parent_node: Source stream to apply filter to. + stream_spec: a Stream, list of Streams, or label-to-Stream dictionary mapping filter_name: ffmpeg filter name, e.g. `colorchannelmixer` *args: list of args to pass to ffmpeg verbatim **kwargs: list of keyword-args to pass to ffmpeg verbatim - This function is used internally by all of the other single-source filters (e.g. ``hflip``, ``crop``, etc.). - For custom multi-source filters, see ``filter_multi`` instead. - The function name is suffixed with ``_`` in order avoid confusion with the standard python ``filter`` function. Example: ``ffmpeg.input('in.mp4').filter_('hflip').output('out.mp4').run()`` """ - return FilterNode([parent_node], filter_name, *args, **kwargs) + return filter_multi_output(stream_spec, filter_name, *args, **kwargs).stream() -def filter_multi(parent_nodes, filter_name, *args, **kwargs): - """Apply custom multi-source filter. - - This is nearly identical to the ``filter`` function except that it allows filters to be applied to multiple - streams. It's normally used by higher-level filter functions such as ``concat``, but if a filter implementation - is missing from ``fmpeg-python``, you can call ``filter_multi`` directly. - - Note that because it applies to multiple streams, it can't be used as an operator, unlike the ``filter`` function - (e.g. ``ffmpeg.input('in.mp4').filter_('hflip')``) - - Args: - parent_nodes: List of source streams to apply filter to. - filter_name: ffmpeg filter name, e.g. `concat` - *args: list of args to pass to ffmpeg verbatim - **kwargs: list of keyword-args to pass to ffmpeg verbatim - - For custom single-source filters, see ``filter_multi`` instead. - - Example: - - ``ffmpeg.filter_multi(ffmpeg.input('in1.mp4'), ffmpeg.input('in2.mp4'), 'concat', n=2).output('out.mp4').run()`` - """ - return FilterNode(parent_nodes, filter_name, *args, **kwargs) - - - -@operator() -def setpts(parent_node, expr): +@filter_operator() +def setpts(stream, expr): """Change the PTS (presentation timestamp) of the input frames. Args: @@ -66,11 +57,11 @@ def setpts(parent_node, expr): Official documentation: `setpts, asetpts `__ """ - return filter_(parent_node, setpts.__name__, expr) + return FilterNode(stream, setpts.__name__, args=[expr]).stream() -@operator() -def trim(parent_node, **kwargs): +@filter_operator() +def trim(stream, **kwargs): """Trim the input so that the output contains one continuous subpart of the input. Args: @@ -88,10 +79,10 @@ def trim(parent_node, **kwargs): Official documentation: `trim `__ """ - return filter_(parent_node, trim.__name__, **kwargs) + return FilterNode(stream, trim.__name__, kwargs=kwargs).stream() -@operator() +@filter_operator() def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs): """Overlay one video on top of another. @@ -136,29 +127,29 @@ def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs Official documentation: `overlay `__ """ kwargs['eof_action'] = eof_action - return filter_multi([main_parent_node, overlay_parent_node], overlay.__name__, **kwargs) + return FilterNode([main_parent_node, overlay_parent_node], overlay.__name__, kwargs=kwargs, max_inputs=2).stream() -@operator() -def hflip(parent_node): +@filter_operator() +def hflip(stream): """Flip the input video horizontally. Official documentation: `hflip `__ """ - return filter_(parent_node, hflip.__name__) + return FilterNode(stream, hflip.__name__).stream() -@operator() -def vflip(parent_node): +@filter_operator() +def vflip(stream): """Flip the input video vertically. Official documentation: `vflip `__ """ - return filter_(parent_node, vflip.__name__) + return FilterNode(stream, vflip.__name__).stream() -@operator() -def drawbox(parent_node, x, y, width, height, color, thickness=None, **kwargs): +@filter_operator() +def drawbox(stream, x, y, width, height, color, thickness=None, **kwargs): """Draw a colored box on the input image. Args: @@ -179,11 +170,11 @@ def drawbox(parent_node, x, y, width, height, color, thickness=None, **kwargs): """ if thickness: kwargs['t'] = thickness - return filter_(parent_node, drawbox.__name__, x, y, width, height, color, **kwargs) + return FilterNode(stream, drawbox.__name__, args=[x, y, width, height, color], kwargs=kwargs).stream() -@operator() -def concat(*parent_nodes, **kwargs): +@filter_operator() +def concat(*streams, **kwargs): """Concatenate audio and video streams, joining them together one after the other. The filter works on segments of synchronized video and audio streams. All segments must have the same number of @@ -208,12 +199,12 @@ def concat(*parent_nodes, **kwargs): Official documentation: `concat `__ """ - kwargs['n'] = len(parent_nodes) - return filter_multi(parent_nodes, concat.__name__, **kwargs) + kwargs['n'] = len(streams) + return FilterNode(streams, concat.__name__, kwargs=kwargs, max_inputs=None).stream() -@operator() -def zoompan(parent_node, **kwargs): +@filter_operator() +def zoompan(stream, **kwargs): """Apply Zoom & Pan effect. Args: @@ -228,11 +219,11 @@ def zoompan(parent_node, **kwargs): Official documentation: `zoompan `__ """ - return filter_(parent_node, zoompan.__name__, **kwargs) + return FilterNode(stream, zoompan.__name__, kwargs=kwargs).stream() -@operator() -def hue(parent_node, **kwargs): +@filter_operator() +def hue(stream, **kwargs): """Modify the hue and/or the saturation of the input. Args: @@ -243,16 +234,16 @@ def hue(parent_node, **kwargs): Official documentation: `hue `__ """ - return filter_(parent_node, hue.__name__, **kwargs) + return FilterNode(stream, hue.__name__, kwargs=kwargs).stream() -@operator() -def colorchannelmixer(parent_node, *args, **kwargs): +@filter_operator() +def colorchannelmixer(stream, *args, **kwargs): """Adjust video input frames by re-mixing color channels. Official documentation: `colorchannelmixer `__ """ - return filter_(parent_node, colorchannelmixer.__name__, **kwargs) + return FilterNode(stream, colorchannelmixer.__name__, kwargs=kwargs).stream() __all__ = [ @@ -260,7 +251,6 @@ __all__ = [ 'concat', 'drawbox', 'filter_', - 'filter_multi', 'hflip', 'hue', 'overlay', diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 09a3ae5..81acb47 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -4,22 +4,23 @@ from .dag import topo_sort from functools import reduce from past.builtins import basestring import copy -import operator as _operator +import operator import subprocess as _subprocess from ._ffmpeg import ( input, - merge_outputs, output, overwrite_output, ) from .nodes import ( GlobalNode, InputNode, - operator, OutputNode, + output_operator, + Stream, ) + def _get_stream_name(name): return '[{}]'.format(name) @@ -73,32 +74,31 @@ def _get_global_args(node): def _get_output_args(node, stream_name_map): + if node.name != output.__name__: + raise ValueError('Unsupported output node: {}'.format(node)) args = [] - if node.name != merge_outputs.__name__: - assert len(node.incoming_edges) == 1 - stream_name = stream_name_map[node.incoming_edges[0].upstream_node] - if stream_name != '[0]': - args += ['-map', stream_name] - if node.name == output.__name__: - kwargs = copy.copy(node.kwargs) - filename = kwargs.pop('filename') - fmt = kwargs.pop('format', None) - if fmt: - args += ['-f', fmt] - args += _convert_kwargs_to_cmd_line_args(kwargs) - args += [filename] - else: - raise ValueError('Unsupported output node: {}'.format(node)) + assert len(node.incoming_edges) == 1 + stream_name = stream_name_map[node.incoming_edges[0].upstream_node] + if stream_name != '[0]': + args += ['-map', stream_name] + kwargs = copy.copy(node.kwargs) + filename = kwargs.pop('filename') + fmt = kwargs.pop('format', None) + if fmt: + args += ['-f', fmt] + args += _convert_kwargs_to_cmd_line_args(kwargs) + args += [filename] return args -@operator(node_classes={OutputNode, GlobalNode}) -def get_args(node): +@output_operator() +def get_args(stream): """Get command-line arguments for ffmpeg.""" + if not isinstance(stream, Stream): + raise TypeError('Expected Stream; got {}'.format(type(stream))) args = [] # TODO: group nodes together, e.g. `-i somefile -r somerate`. - sorted_nodes, outgoing_edge_maps = topo_sort([node]) - del(node) + sorted_nodes, outgoing_edge_maps = topo_sort([stream.node]) input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)] output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode) and not isinstance(node, GlobalNode)] @@ -106,15 +106,15 @@ def get_args(node): filter_nodes = [node for node in sorted_nodes if node not in (input_nodes + output_nodes + global_nodes)] stream_name_map = {node: _get_stream_name(i) for i, node in enumerate(input_nodes)} filter_arg = _get_filter_arg(filter_nodes, stream_name_map) - args += reduce(_operator.add, [_get_input_args(node) for node in input_nodes]) + args += reduce(operator.add, [_get_input_args(node) for node in input_nodes]) if filter_arg: args += ['-filter_complex', filter_arg] - args += reduce(_operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes]) - args += reduce(_operator.add, [_get_global_args(node) for node in global_nodes], []) + args += reduce(operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes]) + args += reduce(operator.add, [_get_global_args(node) for node in global_nodes], []) return args -@operator(node_classes={OutputNode, GlobalNode}) +@output_operator() def run(node, cmd='ffmpeg'): """Run ffmpeg on node graph.""" if isinstance(cmd, basestring): diff --git a/ffmpeg/_utils.py b/ffmpeg/_utils.py new file mode 100644 index 0000000..04f2add --- /dev/null +++ b/ffmpeg/_utils.py @@ -0,0 +1,27 @@ +import hashlib + + +def _recursive_repr(item): + """Hack around python `repr` to deterministically represent dictionaries. + + This is able to represent more things than json.dumps, since it does not require things to be JSON serializable + (e.g. datetimes). + """ + if isinstance(item, basestring): + result = str(item) + elif isinstance(item, list): + result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item])) + elif isinstance(item, dict): + kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)] + result = '{' + ', '.join(kv_pairs) + '}' + else: + result = repr(item) + return result + + +def get_hash(item): + repr_ = _recursive_repr(item).encode('utf-8') + return hashlib.md5(repr_).hexdigest() + +def get_hash_int(item): + return int(get_hash(item), base=16) diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py index f8baa9f..c8a634e 100644 --- a/ffmpeg/dag.py +++ b/ffmpeg/dag.py @@ -1,31 +1,6 @@ +from ._utils import get_hash, get_hash_int from builtins import object from collections import namedtuple -import hashlib - - -def _recursive_repr(item): - """Hack around python `repr` to deterministically represent dictionaries. - - This is able to represent more things than json.dumps, since it does not require things to be JSON serializable - (e.g. datetimes). - """ - if isinstance(item, basestring): - result = str(item) - elif isinstance(item, list): - result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item])) - elif isinstance(item, dict): - kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)] - result = '{' + ', '.join(kv_pairs) + '}' - else: - result = repr(item) - return result - - -def _get_hash(item): - hasher = hashlib.sha224() - repr_ = _recursive_repr(item) - hasher.update(repr_.encode('utf-8')) - return hasher.hexdigest() class DagNode(object): @@ -113,20 +88,6 @@ def get_outgoing_edges(upstream_node, outgoing_edge_map): class KwargReprNode(DagNode): """A DagNode that can be represented as a set of args+kwargs. """ - def __get_hash(self): - hashes = self.__upstream_hashes + [self.__inner_hash] - hash_strs = [str(x) for x in hashes] - hashes_str = ','.join(hash_strs).encode('utf-8') - hash_str = hashlib.md5(hashes_str).hexdigest() - return int(hash_str, base=16) - - def __init__(self, incoming_edge_map, name, args, kwargs): - self.__incoming_edge_map = incoming_edge_map - self.name = name - self.args = args - self.kwargs = kwargs - self.__hash = self.__get_hash() - @property def __upstream_hashes(self): hashes = [] @@ -137,7 +98,18 @@ class KwargReprNode(DagNode): @property def __inner_hash(self): props = {'args': self.args, 'kwargs': self.kwargs} - return _get_hash(props) + return get_hash(props) + + def __get_hash(self): + hashes = self.__upstream_hashes + [self.__inner_hash] + return get_hash_int(hashes) + + def __init__(self, incoming_edge_map, name, args, kwargs): + self.__incoming_edge_map = incoming_edge_map + self.name = name + self.args = args + self.kwargs = kwargs + self.__hash = self.__get_hash() def __hash__(self): return self.__hash @@ -149,10 +121,16 @@ class KwargReprNode(DagNode): def short_hash(self): return '{:x}'.format(abs(hash(self)))[:12] - def __repr__(self): + def long_repr(self, include_hash=True): formatted_props = ['{!r}'.format(arg) for arg in self.args] formatted_props += ['{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)] - return '{}({}) <{}>'.format(self.name, ', '.join(formatted_props), self.short_hash) + out = '{}({})'.format(self.name, ', '.join(formatted_props)) + if include_hash: + out += ' <{}>'.format(self.short_hash) + return out + + def __repr__(self): + return self.long_repr() @property def incoming_edges(self): @@ -190,7 +168,7 @@ def topo_sort(downstream_nodes): marked_nodes.remove(upstream_node) sorted_nodes.append(upstream_node) - unmarked_nodes = [(node, 0) for node in downstream_nodes] + unmarked_nodes = [(node, None) for node in downstream_nodes] while unmarked_nodes: upstream_node, upstream_label = unmarked_nodes.pop() visit(upstream_node, upstream_label, None, None) diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index 67415f8..b458fae 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -1,27 +1,133 @@ from __future__ import unicode_literals from .dag import KwargReprNode +from ._utils import get_hash_int + + +def _is_of_types(obj, types): + valid = False + for stream_type in types: + if isinstance(obj, stream_type): + valid = True + break + return valid + + +def _get_types_str(types): + return ', '.join(['{}.{}'.format(x.__module__, x.__name__) for x in types]) + + +class Stream(object): + """Represents the outgoing edge of an upstream node; may be used to create more downstream nodes.""" + def __init__(self, upstream_node, upstream_label, node_types): + if not _is_of_types(upstream_node, node_types): + raise TypeError('Expected upstream node to be of one of the following type(s): {}; got {}'.format( + _get_types_str(node_types), type(upstream_node))) + self.node = upstream_node + self.label = upstream_label + + def __hash__(self): + return get_hash_int([hash(self.node), hash(self.label)]) + + def __eq__(self, other): + return hash(self) == hash(other) + + def __repr__(self): + node_repr = self.node.long_repr(include_hash=False) + out = '{}[{!r}] <{}>'.format(node_repr, self.label, self.node.short_hash) + return out class Node(KwargReprNode): """Node base""" - def __init__(self, parents, name, *args, **kwargs): + @classmethod + def __check_input_len(cls, stream_map, min_inputs, max_inputs): + if min_inputs is not None and len(stream_map) < min_inputs: + raise ValueError('Expected at least {} input stream(s); got {}'.format(min_inputs, len(stream_map))) + elif max_inputs is not None and len(stream_map) > max_inputs: + raise ValueError('Expected at most {} input stream(s); got {}'.format(max_inputs, len(stream_map))) + + @classmethod + def __check_input_types(cls, stream_map, incoming_stream_types): + for stream in stream_map.values(): + if not _is_of_types(stream, incoming_stream_types): + raise TypeError('Expected incoming stream(s) to be of one of the following types: {}; got {}' + .format(_get_types_str(incoming_stream_types), type(stream))) + + @classmethod + def __get_stream_map(cls, stream_spec): + if stream_spec is None: + stream_map = {} + elif isinstance(stream_spec, Stream): + stream_map = {None: stream_spec} + elif isinstance(stream_spec, (list, tuple)): + stream_map = dict(enumerate(stream_spec)) + elif isinstance(stream_spec, dict): + stream_map = stream_spec + return stream_map + + @classmethod + def __get_incoming_edge_map(cls, stream_map): incoming_edge_map = {} - for downstream_label, parent in enumerate(parents): - upstream_label = 0 # assume nodes have a single output (FIXME) - upstream_node = parent - incoming_edge_map[downstream_label] = (upstream_node, upstream_label) + for downstream_label, upstream in stream_map.items(): + incoming_edge_map[downstream_label] = (upstream.node, upstream.label) + return incoming_edge_map + + def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs, max_inputs, args, + kwargs): + stream_map = self.__get_stream_map(stream_spec) + self.__check_input_len(stream_map, min_inputs, max_inputs) + self.__check_input_types(stream_map, incoming_stream_types) + incoming_edge_map = self.__get_incoming_edge_map(stream_map) super(Node, self).__init__(incoming_edge_map, name, args, kwargs) + self.__outgoing_stream_type = outgoing_stream_type + + def stream(self, label=None): + """Create an outgoing stream originating from this node. + + More nodes may be attached onto the outgoing stream. + """ + return self.__outgoing_stream_type(self, label) + + def __getitem__(self, label): + """Create an outgoing stream originating from this node; syntactic sugar for ``self.stream(label)``. + """ + return self.stream(label) + + +class FilterableStream(Stream): + def __init__(self, upstream_node, upstream_label): + super(FilterableStream, self).__init__(upstream_node, upstream_label, {InputNode, FilterNode}) class InputNode(Node): """InputNode type""" - def __init__(self, name, *args, **kwargs): - super(InputNode, self).__init__(parents=[], name=name, *args, **kwargs) + def __init__(self, name, args=[], kwargs={}): + super(InputNode, self).__init__( + stream_spec=None, + name=name, + incoming_stream_types={}, + outgoing_stream_type=FilterableStream, + min_inputs=0, + max_inputs=0, + args=args, + kwargs=kwargs + ) class FilterNode(Node): - """FilterNode""" + def __init__(self, stream_spec, name, max_inputs=1, args=[], kwargs={}): + super(FilterNode, self).__init__( + stream_spec=stream_spec, + name=name, + incoming_stream_types={FilterableStream}, + outgoing_stream_type=FilterableStream, + min_inputs=1, + max_inputs=max_inputs, + args=args, + kwargs=kwargs + ) + def _get_filter(self): params_text = self.name arg_params = ['{}'.format(arg) for arg in self.args] @@ -33,20 +139,61 @@ class FilterNode(Node): class OutputNode(Node): - """OutputNode""" - pass + def __init__(self, stream, name, args=[], kwargs={}): + super(OutputNode, self).__init__( + stream_spec=stream, + name=name, + incoming_stream_types={FilterableStream}, + outgoing_stream_type=OutputStream, + min_inputs=1, + max_inputs=1, + args=args, + kwargs=kwargs + ) + + +class OutputStream(Stream): + def __init__(self, upstream_node, upstream_label): + super(OutputStream, self).__init__(upstream_node, upstream_label, {OutputNode, GlobalNode}) + + +class MergeOutputsNode(Node): + def __init__(self, stream, name): + super(MergeOutputsNode, self).__init__( + stream_spec=None, + name=name, + incoming_stream_types={OutputStream}, + outgoing_stream_type=OutputStream, + min_inputs=1, + max_inputs=None + ) class GlobalNode(Node): - def __init__(self, parent, name, *args, **kwargs): - if not isinstance(parent, OutputNode): - raise RuntimeError('Global nodes can only be attached after output nodes') - super(GlobalNode, self).__init__([parent], name, *args, **kwargs) + def __init__(self, stream, name, args=[], kwargs={}): + super(GlobalNode, self).__init__( + stream_spec=stream, + name=name, + incoming_stream_types={OutputStream}, + outgoing_stream_type=OutputStream, + min_inputs=1, + max_inputs=1, + args=args, + kwargs=kwargs + ) -def operator(node_classes={Node}, name=None): +def stream_operator(stream_classes={Stream}, name=None): def decorator(func): func_name = name or func.__name__ - [setattr(node_class, func_name, func) for node_class in node_classes] + [setattr(stream_class, func_name, func) for stream_class in stream_classes] return func return decorator + + +def filter_operator(name=None): + return stream_operator(stream_classes={FilterableStream}, name=name) + + +def output_operator(name=None): + return stream_operator(stream_classes={OutputStream}, name=name) diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index 190e2f9..cd52837 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -39,11 +39,8 @@ def test_fluent_concat(): concat1 = ffmpeg.concat(trimmed1, trimmed2, trimmed3) concat2 = ffmpeg.concat(trimmed1, trimmed2, trimmed3) concat3 = ffmpeg.concat(trimmed1, trimmed3, trimmed2) - concat4 = ffmpeg.concat() - concat5 = ffmpeg.concat() assert concat1 == concat2 assert concat1 != concat3 - assert concat4 == concat5 def test_fluent_output(): @@ -66,19 +63,28 @@ def test_fluent_complex_filter(): ) -def test_repr(): +def test_node_repr(): in_file = ffmpeg.input('dummy.mp4') trim1 = ffmpeg.trim(in_file, start_frame=10, end_frame=20) trim2 = ffmpeg.trim(in_file, start_frame=30, end_frame=40) trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60) concatted = ffmpeg.concat(trim1, trim2, trim3) output = ffmpeg.output(concatted, 'dummy2.mp4') - assert repr(in_file) == "input(filename={!r}) <{}>".format('dummy.mp4', in_file.short_hash) - assert repr(trim1) == "trim(end_frame=20, start_frame=10) <{}>".format(trim1.short_hash) - assert repr(trim2) == "trim(end_frame=40, start_frame=30) <{}>".format(trim2.short_hash) - assert repr(trim3) == "trim(end_frame=60, start_frame=50) <{}>".format(trim3.short_hash) - assert repr(concatted) == "concat(n=3) <{}>".format(concatted.short_hash) - assert repr(output) == "output(filename={!r}) <{}>".format('dummy2.mp4', output.short_hash) + assert repr(in_file.node) == "input(filename={!r}) <{}>".format('dummy.mp4', in_file.node.short_hash) + assert repr(trim1.node) == "trim(end_frame=20, start_frame=10) <{}>".format(trim1.node.short_hash) + assert repr(trim2.node) == "trim(end_frame=40, start_frame=30) <{}>".format(trim2.node.short_hash) + assert repr(trim3.node) == "trim(end_frame=60, start_frame=50) <{}>".format(trim3.node.short_hash) + assert repr(concatted.node) == "concat(n=3) <{}>".format(concatted.node.short_hash) + assert repr(output.node) == "output(filename={!r}) <{}>".format('dummy2.mp4', output.node.short_hash) + + +def test_stream_repr(): + in_file = ffmpeg.input('dummy.mp4') + assert repr(in_file) == "input(filename={!r})[None] <{}>".format('dummy.mp4', in_file.node.short_hash) + split0 = in_file.filter_multi_output('split')[0] + assert repr(split0) == "split()[0] <{}>".format(split0.node.short_hash) + dummy_out = in_file.filter_multi_output('dummy')['out'] + assert repr(dummy_out) == "dummy()[{!r}] <{}>".format(dummy_out.label, dummy_out.node.short_hash) def test_get_args_simple(): @@ -201,7 +207,7 @@ def test_pipe(): p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) in_data = bytes(bytearray([random.randint(0,255) for _ in range(frame_size * frame_count)])) - p.stdin.write(in_data) # note: this could block, in which case need to use threads + p.stdin.write(in_data) # note: this could block, in which case need to use threads p.stdin.close() out_data = p.stdout.read() From b7fc33172246e449d77b43c1cdcb43aad4cd474d Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Thu, 6 Jul 2017 02:32:45 -0600 Subject: [PATCH 08/15] Add `split` operator --- ffmpeg/_filters.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ffmpeg/_filters.py b/ffmpeg/_filters.py index 2883873..79949ab 100644 --- a/ffmpeg/_filters.py +++ b/ffmpeg/_filters.py @@ -48,6 +48,11 @@ def filter_(stream_spec, filter_name, *args, **kwargs): return filter_multi_output(stream_spec, filter_name, *args, **kwargs).stream() +@filter_operator() +def split(stream): + return FilterNode(stream, split.__name__) + + @filter_operator() def setpts(stream, expr): """Change the PTS (presentation timestamp) of the input frames. From b548092d480871e402e2d50ab96d864c5851cab2 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Thu, 6 Jul 2017 02:49:05 -0600 Subject: [PATCH 09/15] #17: fix __init__ changes --- ffmpeg/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ffmpeg/__init__.py b/ffmpeg/__init__.py index 9f1bed7..1953609 100644 --- a/ffmpeg/__init__.py +++ b/ffmpeg/__init__.py @@ -3,5 +3,4 @@ from . import _filters, _ffmpeg, _run from ._filters import * from ._ffmpeg import * from ._run import * -from ._view import * -__all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ + _view.__all__ +__all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ From 662c56eb5b228b7fbf301035c074749dc9eced29 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Thu, 6 Jul 2017 02:50:22 -0600 Subject: [PATCH 10/15] #17: fix python 3 support --- ffmpeg/__init__.py | 2 ++ ffmpeg/_ffmpeg.py | 1 + ffmpeg/_filters.py | 1 + ffmpeg/_utils.py | 6 +++++- ffmpeg/dag.py | 8 +++++--- ffmpeg/nodes.py | 5 +++-- 6 files changed, 17 insertions(+), 6 deletions(-) diff --git a/ffmpeg/__init__.py b/ffmpeg/__init__.py index 1953609..154d8b6 100644 --- a/ffmpeg/__init__.py +++ b/ffmpeg/__init__.py @@ -1,6 +1,8 @@ from __future__ import unicode_literals + from . import _filters, _ffmpeg, _run from ._filters import * from ._ffmpeg import * from ._run import * + __all__ = _filters.__all__ + _ffmpeg.__all__ + _run.__all__ diff --git a/ffmpeg/_ffmpeg.py b/ffmpeg/_ffmpeg.py index 8dc6c87..968e793 100644 --- a/ffmpeg/_ffmpeg.py +++ b/ffmpeg/_ffmpeg.py @@ -1,4 +1,5 @@ from __future__ import unicode_literals + from .nodes import ( filter_operator, GlobalNode, diff --git a/ffmpeg/_filters.py b/ffmpeg/_filters.py index 79949ab..a422f00 100644 --- a/ffmpeg/_filters.py +++ b/ffmpeg/_filters.py @@ -1,4 +1,5 @@ from __future__ import unicode_literals + from .nodes import ( FilterNode, filter_operator, diff --git a/ffmpeg/_utils.py b/ffmpeg/_utils.py index 04f2add..06c5765 100644 --- a/ffmpeg/_utils.py +++ b/ffmpeg/_utils.py @@ -1,4 +1,8 @@ -import hashlib +from __future__ import unicode_literals + +from builtins import str +from past.builtins import basestring +import hashlib def _recursive_repr(item): diff --git a/ffmpeg/dag.py b/ffmpeg/dag.py index c8a634e..3ce3891 100644 --- a/ffmpeg/dag.py +++ b/ffmpeg/dag.py @@ -1,3 +1,5 @@ +from __future__ import unicode_literals + from ._utils import get_hash, get_hash_int from builtins import object from collections import namedtuple @@ -72,14 +74,14 @@ DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstrea def get_incoming_edges(downstream_node, incoming_edge_map): edges = [] - for downstream_label, (upstream_node, upstream_label) in incoming_edge_map.items(): + for downstream_label, (upstream_node, upstream_label) in list(incoming_edge_map.items()): edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] return edges def get_outgoing_edges(upstream_node, outgoing_edge_map): edges = [] - for upstream_label, downstream_infos in outgoing_edge_map.items(): + for upstream_label, downstream_infos in list(outgoing_edge_map.items()): for (downstream_node, downstream_label) in downstream_infos: edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)] return edges @@ -91,7 +93,7 @@ class KwargReprNode(DagNode): @property def __upstream_hashes(self): hashes = [] - for downstream_label, (upstream_node, upstream_label) in self.incoming_edge_map.items(): + for downstream_label, (upstream_node, upstream_label) in list(self.incoming_edge_map.items()): hashes += [hash(x) for x in [downstream_label, upstream_node, upstream_label]] return hashes diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index b458fae..bc1d55d 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +from builtins import object from .dag import KwargReprNode from ._utils import get_hash_int @@ -49,7 +50,7 @@ class Node(KwargReprNode): @classmethod def __check_input_types(cls, stream_map, incoming_stream_types): - for stream in stream_map.values(): + for stream in list(stream_map.values()): if not _is_of_types(stream, incoming_stream_types): raise TypeError('Expected incoming stream(s) to be of one of the following types: {}; got {}' .format(_get_types_str(incoming_stream_types), type(stream))) @@ -69,7 +70,7 @@ class Node(KwargReprNode): @classmethod def __get_incoming_edge_map(cls, stream_map): incoming_edge_map = {} - for downstream_label, upstream in stream_map.items(): + for downstream_label, upstream in list(stream_map.items()): incoming_edge_map[downstream_label] = (upstream.node, upstream.label) return incoming_edge_map From 8337d34b8248134da99164e8a294f66796106e24 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Thu, 6 Jul 2017 03:35:03 -0600 Subject: [PATCH 11/15] #17: add proper handling of `split` operator --- ffmpeg/_run.py | 38 ++++++++++++++++++++++--------- ffmpeg/nodes.py | 26 ++++++++++----------- ffmpeg/tests/test_ffmpeg.py | 45 ++++++++++++++++++++++--------------- 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 81acb47..eb00c90 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -1,6 +1,6 @@ from __future__ import unicode_literals -from .dag import topo_sort +from .dag import get_outgoing_edges, topo_sort from functools import reduce from past.builtins import basestring import copy @@ -53,16 +53,31 @@ def _get_input_args(input_node): return args -def _get_filter_spec(i, node, stream_name_map): - stream_name = _get_stream_name('v{}'.format(i)) - stream_name_map[node] = stream_name - inputs = [stream_name_map[edge.upstream_node] for edge in node.incoming_edges] - filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(), stream_name) +def _get_filter_spec(node, outgoing_edge_map, stream_name_map): + incoming_edges = node.incoming_edges + outgoing_edges = get_outgoing_edges(node, outgoing_edge_map) + inputs = [stream_name_map[edge.upstream_node, edge.upstream_label] for edge in incoming_edges] + outputs = [stream_name_map[edge.upstream_node, edge.upstream_label] for edge in outgoing_edges] + filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(), ''.join(outputs)) return filter_spec -def _get_filter_arg(filter_nodes, stream_name_map): - filter_specs = [_get_filter_spec(i, node, stream_name_map) for i, node in enumerate(filter_nodes)] +def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map): + stream_count = 0 + for upstream_node in filter_nodes: + outgoing_edge_map = outgoing_edge_maps[upstream_node] + for upstream_label, downstreams in outgoing_edge_map.items(): + if len(downstreams) > 1: + # TODO: automatically insert `splits` ahead of time via graph transformation. + raise ValueError('Encountered {} with multiple outgoing edges with same upstream label {!r}; a ' + '`split` filter is probably required'.format(upstream_node, upstream_label)) + stream_name_map[upstream_node, upstream_label] = _get_stream_name('s{}'.format(stream_count)) + stream_count += 1 + + +def _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map): + _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map) + filter_specs = [_get_filter_spec(node, outgoing_edge_maps[node], stream_name_map) for node in filter_nodes] return ';'.join(filter_specs) @@ -78,7 +93,8 @@ def _get_output_args(node, stream_name_map): raise ValueError('Unsupported output node: {}'.format(node)) args = [] assert len(node.incoming_edges) == 1 - stream_name = stream_name_map[node.incoming_edges[0].upstream_node] + edge = node.incoming_edges[0] + stream_name = stream_name_map[edge.upstream_node, edge.upstream_label] if stream_name != '[0]': args += ['-map', stream_name] kwargs = copy.copy(node.kwargs) @@ -104,8 +120,8 @@ def get_args(stream): isinstance(node, GlobalNode)] global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)] filter_nodes = [node for node in sorted_nodes if node not in (input_nodes + output_nodes + global_nodes)] - stream_name_map = {node: _get_stream_name(i) for i, node in enumerate(input_nodes)} - filter_arg = _get_filter_arg(filter_nodes, stream_name_map) + stream_name_map = {(node, None): _get_stream_name(i) for i, node in enumerate(input_nodes)} + filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map) args += reduce(operator.add, [_get_input_args(node) for node in input_nodes]) if filter_arg: args += ['-filter_complex', filter_arg] diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index bc1d55d..17b4bc9 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -39,6 +39,18 @@ class Stream(object): return out +def get_stream_map(stream_spec): + if stream_spec is None: + stream_map = {} + elif isinstance(stream_spec, Stream): + stream_map = {None: stream_spec} + elif isinstance(stream_spec, (list, tuple)): + stream_map = dict(enumerate(stream_spec)) + elif isinstance(stream_spec, dict): + stream_map = stream_spec + return stream_map + + class Node(KwargReprNode): """Node base""" @classmethod @@ -55,18 +67,6 @@ class Node(KwargReprNode): raise TypeError('Expected incoming stream(s) to be of one of the following types: {}; got {}' .format(_get_types_str(incoming_stream_types), type(stream))) - @classmethod - def __get_stream_map(cls, stream_spec): - if stream_spec is None: - stream_map = {} - elif isinstance(stream_spec, Stream): - stream_map = {None: stream_spec} - elif isinstance(stream_spec, (list, tuple)): - stream_map = dict(enumerate(stream_spec)) - elif isinstance(stream_spec, dict): - stream_map = stream_spec - return stream_map - @classmethod def __get_incoming_edge_map(cls, stream_map): incoming_edge_map = {} @@ -76,7 +76,7 @@ class Node(KwargReprNode): def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs, max_inputs, args, kwargs): - stream_map = self.__get_stream_map(stream_spec) + stream_map = get_stream_map(stream_spec) self.__check_input_len(stream_map, min_inputs, max_inputs) self.__check_input_types(stream_map, incoming_stream_types) incoming_edge_map = self.__get_incoming_edge_map(stream_map) diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index cd52837..32b8be5 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -93,12 +93,19 @@ def test_get_args_simple(): def _get_complex_filter_example(): - in_file = ffmpeg.input(TEST_INPUT_FILE) + split = (ffmpeg + .input(TEST_INPUT_FILE) + .vflip() + .split() + ) + split0 = split[0] + split1 = split[1] + overlay_file = ffmpeg.input(TEST_OVERLAY_FILE) return (ffmpeg .concat( - in_file.trim(start_frame=10, end_frame=20), - in_file.trim(start_frame=30, end_frame=40), + split0.trim(start_frame=10, end_frame=20), + split1.trim(start_frame=30, end_frame=40), ) .overlay(overlay_file.hflip()) .drawbox(50, 50, 120, 120, color='red', thickness=5) @@ -110,21 +117,23 @@ def _get_complex_filter_example(): def test_get_args_complex_filter(): out = _get_complex_filter_example() args = ffmpeg.get_args(out) - assert args == [ - '-i', TEST_INPUT_FILE, + assert args == ['-i', TEST_INPUT_FILE, '-i', TEST_OVERLAY_FILE, '-filter_complex', - '[0]trim=end_frame=20:start_frame=10[v0];' \ - '[0]trim=end_frame=40:start_frame=30[v1];' \ - '[v0][v1]concat=n=2[v2];' \ - '[1]hflip[v3];' \ - '[v2][v3]overlay=eof_action=repeat[v4];' \ - '[v4]drawbox=50:50:120:120:red:t=5[v5]', - '-map', '[v5]', os.path.join(SAMPLE_DATA_DIR, 'dummy2.mp4'), + '[0]vflip[s0];' \ + '[s0]split[s1][s2];' \ + '[s1]trim=end_frame=20:start_frame=10[s3];' \ + '[s2]trim=end_frame=40:start_frame=30[s4];' \ + '[s3][s4]concat=n=2[s5];' \ + '[1]hflip[s6];' \ + '[s5][s6]overlay=eof_action=repeat[s7];' \ + '[s7]drawbox=50:50:120:120:red:t=5[s8]', + '-map', '[s8]', os.path.join(SAMPLE_DATA_DIR, 'dummy2.mp4'), '-y' ] + #def test_version(): # subprocess.check_call(['ffmpeg', '-version']) @@ -156,8 +165,8 @@ def test_custom_filter(): node = ffmpeg.output(node, 'dummy2.mp4') assert node.get_args() == [ '-i', 'dummy.mp4', - '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[v0]', - '-map', '[v0]', + '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]', + '-map', '[s0]', 'dummy2.mp4' ] @@ -170,8 +179,8 @@ def test_custom_filter_fluent(): ) assert node.get_args() == [ '-i', 'dummy.mp4', - '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[v0]', - '-map', '[v0]', + '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]', + '-map', '[s0]', 'dummy2.mp4' ] @@ -197,8 +206,8 @@ def test_pipe(): '-pixel_format', 'rgb24', '-i', 'pipe:0', '-filter_complex', - '[0]trim=start_frame=2[v0]', - '-map', '[v0]', + '[0]trim=start_frame=2[s0]', + '-map', '[s0]', '-f', 'rawvideo', 'pipe:1' ] From c6e2f05e5b1509d0d262fcee56a348db520c02f4 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Thu, 6 Jul 2017 03:42:03 -0600 Subject: [PATCH 12/15] #17: add `short_repr` for input and output nodes --- ffmpeg/nodes.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index 17b4bc9..c2b525e 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -1,8 +1,9 @@ from __future__ import unicode_literals -from builtins import object from .dag import KwargReprNode from ._utils import get_hash_int +from builtins import object +import os def _is_of_types(obj, types): @@ -115,6 +116,10 @@ class InputNode(Node): kwargs=kwargs ) + @property + def short_repr(self): + return os.path.basename(self.kwargs['filename']) + class FilterNode(Node): def __init__(self, stream_spec, name, max_inputs=1, args=[], kwargs={}): @@ -152,6 +157,10 @@ class OutputNode(Node): kwargs=kwargs ) + @property + def short_repr(self): + return os.path.basename(self.kwargs['filename']) + class OutputStream(Stream): def __init__(self, upstream_node, upstream_label): From 5d78a2595d2c11236726e456e4887448751475c7 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Sun, 9 Jul 2017 15:50:51 -0600 Subject: [PATCH 13/15] #17: fix `merge_outputs`; allow `stream_spec` in `get_args`+`run` --- .gitignore | 2 +- ffmpeg/_run.py | 27 +++--- ffmpeg/nodes.py | 24 ++++-- .../tests/sample_data/{dummy.mp4 => in1.mp4} | Bin ffmpeg/tests/test_ffmpeg.py | 80 +++++++++++++----- 5 files changed, 96 insertions(+), 37 deletions(-) rename ffmpeg/tests/sample_data/{dummy.mp4 => in1.mp4} (100%) diff --git a/.gitignore b/.gitignore index f0d6df7..3179ebc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,6 @@ .eggs .tox/ dist/ -ffmpeg/tests/sample_data/dummy2.mp4 +ffmpeg/tests/sample_data/out*.mp4 ffmpeg_python.egg-info/ venv* diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index eb00c90..83117e4 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -13,11 +13,12 @@ from ._ffmpeg import ( overwrite_output, ) from .nodes import ( + get_stream_spec_nodes, + FilterNode, GlobalNode, InputNode, OutputNode, output_operator, - Stream, ) @@ -108,18 +109,16 @@ def _get_output_args(node, stream_name_map): @output_operator() -def get_args(stream): +def get_args(stream_spec, overwrite_output=False): """Get command-line arguments for ffmpeg.""" - if not isinstance(stream, Stream): - raise TypeError('Expected Stream; got {}'.format(type(stream))) + nodes = get_stream_spec_nodes(stream_spec) args = [] # TODO: group nodes together, e.g. `-i somefile -r somerate`. - sorted_nodes, outgoing_edge_maps = topo_sort([stream.node]) + sorted_nodes, outgoing_edge_maps = topo_sort(nodes) input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)] - output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode) and not - isinstance(node, GlobalNode)] + output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode)] global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)] - filter_nodes = [node for node in sorted_nodes if node not in (input_nodes + output_nodes + global_nodes)] + filter_nodes = [node for node in sorted_nodes if isinstance(node, FilterNode)] stream_name_map = {(node, None): _get_stream_name(i) for i, node in enumerate(input_nodes)} filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map) args += reduce(operator.add, [_get_input_args(node) for node in input_nodes]) @@ -127,17 +126,23 @@ def get_args(stream): args += ['-filter_complex', filter_arg] args += reduce(operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes]) args += reduce(operator.add, [_get_global_args(node) for node in global_nodes], []) + if overwrite_output: + args += ['-y'] return args @output_operator() -def run(node, cmd='ffmpeg'): - """Run ffmpeg on node graph.""" +def run(stream_spec, cmd='ffmpeg', **kwargs): + """Run ffmpeg on node graph. + + Args: + **kwargs: keyword-arguments passed to ``get_args()`` (e.g. ``overwrite_output=True``). + """ if isinstance(cmd, basestring): cmd = [cmd] elif type(cmd) != list: cmd = list(cmd) - args = cmd + node.get_args() + args = cmd + get_args(stream_spec, **kwargs) _subprocess.check_call(args) diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index c2b525e..11b8f85 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -52,6 +52,20 @@ def get_stream_map(stream_spec): return stream_map +def get_stream_map_nodes(stream_map): + nodes = [] + for stream in stream_map.values(): + if not isinstance(stream, Stream): + raise TypeError('Expected Stream; got {}'.format(type(stream))) + nodes.append(stream.node) + return nodes + + +def get_stream_spec_nodes(stream_spec): + stream_map = get_stream_map(stream_spec) + return get_stream_map_nodes(stream_map) + + class Node(KwargReprNode): """Node base""" @classmethod @@ -75,8 +89,8 @@ class Node(KwargReprNode): incoming_edge_map[downstream_label] = (upstream.node, upstream.label) return incoming_edge_map - def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs, max_inputs, args, - kwargs): + def __init__(self, stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs, max_inputs, args=[], + kwargs={}): stream_map = get_stream_map(stream_spec) self.__check_input_len(stream_map, min_inputs, max_inputs) self.__check_input_types(stream_map, incoming_stream_types) @@ -164,13 +178,13 @@ class OutputNode(Node): class OutputStream(Stream): def __init__(self, upstream_node, upstream_label): - super(OutputStream, self).__init__(upstream_node, upstream_label, {OutputNode, GlobalNode}) + super(OutputStream, self).__init__(upstream_node, upstream_label, {OutputNode, GlobalNode, MergeOutputsNode}) class MergeOutputsNode(Node): - def __init__(self, stream, name): + def __init__(self, streams, name): super(MergeOutputsNode, self).__init__( - stream_spec=None, + stream_spec=streams, name=name, incoming_stream_types={OutputStream}, outgoing_stream_type=OutputStream, diff --git a/ffmpeg/tests/sample_data/dummy.mp4 b/ffmpeg/tests/sample_data/in1.mp4 similarity index 100% rename from ffmpeg/tests/sample_data/dummy.mp4 rename to ffmpeg/tests/sample_data/in1.mp4 diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index 32b8be5..47f8f72 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -8,9 +8,10 @@ import random TEST_DIR = os.path.dirname(__file__) SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data') -TEST_INPUT_FILE = os.path.join(SAMPLE_DATA_DIR, 'dummy.mp4') +TEST_INPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'in1.mp4') TEST_OVERLAY_FILE = os.path.join(SAMPLE_DATA_DIR, 'overlay.png') -TEST_OUTPUT_FILE = os.path.join(SAMPLE_DATA_DIR, 'dummy2.mp4') +TEST_OUTPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'out1.mp4') +TEST_OUTPUT_FILE2 = os.path.join(SAMPLE_DATA_DIR, 'out2.mp4') subprocess.check_call(['ffmpeg', '-version']) @@ -94,7 +95,7 @@ def test_get_args_simple(): def _get_complex_filter_example(): split = (ffmpeg - .input(TEST_INPUT_FILE) + .input(TEST_INPUT_FILE1) .vflip() .split() ) @@ -109,7 +110,7 @@ def _get_complex_filter_example(): ) .overlay(overlay_file.hflip()) .drawbox(50, 50, 120, 120, color='red', thickness=5) - .output(TEST_OUTPUT_FILE) + .output(TEST_OUTPUT_FILE1) .overwrite_output() ) @@ -117,7 +118,7 @@ def _get_complex_filter_example(): def test_get_args_complex_filter(): out = _get_complex_filter_example() args = ffmpeg.get_args(out) - assert args == ['-i', TEST_INPUT_FILE, + assert args == ['-i', TEST_INPUT_FILE1, '-i', TEST_OVERLAY_FILE, '-filter_complex', '[0]vflip[s0];' \ @@ -128,7 +129,7 @@ def test_get_args_complex_filter(): '[1]hflip[s6];' \ '[s5][s6]overlay=eof_action=repeat[s7];' \ '[s7]drawbox=50:50:120:120:red:t=5[s8]', - '-map', '[s8]', os.path.join(SAMPLE_DATA_DIR, 'dummy2.mp4'), + '-map', '[s8]', TEST_OUTPUT_FILE1, '-y' ] @@ -139,31 +140,38 @@ def test_get_args_complex_filter(): def test_run(): - node = _get_complex_filter_example() - ffmpeg.run(node) + stream = _get_complex_filter_example() + ffmpeg.run(stream) + + +def test_run_multi_output(): + in_ = ffmpeg.input(TEST_INPUT_FILE1) + out1 = in_.output(TEST_OUTPUT_FILE1) + out2 = in_.output(TEST_OUTPUT_FILE2) + ffmpeg.run([out1, out2], overwrite_output=True) def test_run_dummy_cmd(): - node = _get_complex_filter_example() - ffmpeg.run(node, cmd='true') + stream = _get_complex_filter_example() + ffmpeg.run(stream, cmd='true') def test_run_dummy_cmd_list(): - node = _get_complex_filter_example() - ffmpeg.run(node, cmd=['true', 'ignored']) + stream = _get_complex_filter_example() + ffmpeg.run(stream, cmd=['true', 'ignored']) def test_run_failing_cmd(): - node = _get_complex_filter_example() + stream = _get_complex_filter_example() with pytest.raises(subprocess.CalledProcessError): - ffmpeg.run(node, cmd='false') + ffmpeg.run(stream, cmd='false') def test_custom_filter(): - node = ffmpeg.input('dummy.mp4') - node = ffmpeg.filter_(node, 'custom_filter', 'a', 'b', kwarg1='c') - node = ffmpeg.output(node, 'dummy2.mp4') - assert node.get_args() == [ + stream = ffmpeg.input('dummy.mp4') + stream = ffmpeg.filter_(stream, 'custom_filter', 'a', 'b', kwarg1='c') + stream = ffmpeg.output(stream, 'dummy2.mp4') + assert stream.get_args() == [ '-i', 'dummy.mp4', '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]', '-map', '[s0]', @@ -172,12 +180,12 @@ def test_custom_filter(): def test_custom_filter_fluent(): - node = (ffmpeg + stream = (ffmpeg .input('dummy.mp4') .filter_('custom_filter', 'a', 'b', kwarg1='c') .output('dummy2.mp4') ) - assert node.get_args() == [ + assert stream.get_args() == [ '-i', 'dummy.mp4', '-filter_complex', '[0]custom_filter=a:b:kwarg1=c[s0]', '-map', '[s0]', @@ -185,6 +193,38 @@ def test_custom_filter_fluent(): ] +def test_merge_outputs(): + in_ = ffmpeg.input('in.mp4') + out1 = in_.output('out1.mp4') + out2 = in_.output('out2.mp4') + assert ffmpeg.merge_outputs(out1, out2).get_args() == [ + '-i', 'in.mp4', 'out1.mp4', 'out2.mp4' + ] + assert ffmpeg.get_args([out1, out2]) == [ + '-i', 'in.mp4', 'out2.mp4', 'out1.mp4' + ] + + +def test_multi_passthrough(): + out1 = ffmpeg.input('in1.mp4').output('out1.mp4') + out2 = ffmpeg.input('in2.mp4').output('out2.mp4') + out = ffmpeg.merge_outputs(out1, out2) + assert ffmpeg.get_args(out) == [ + '-i', 'in1.mp4', + '-i', 'in2.mp4', + 'out1.mp4', + '-map', '[1]', # FIXME: this should not be here (see #23) + 'out2.mp4' + ] + assert ffmpeg.get_args([out1, out2]) == [ + '-i', 'in2.mp4', + '-i', 'in1.mp4', + 'out2.mp4', + '-map', '[1]', # FIXME: this should not be here (see #23) + 'out1.mp4' + ] + + def test_pipe(): width = 32 height = 32 From cf1b7bfd4bb50ed25dd6a31eab5611745c37c987 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Mon, 10 Jul 2017 23:38:30 -0600 Subject: [PATCH 14/15] #17: auto-generate split output count --- ffmpeg/_run.py | 2 +- ffmpeg/nodes.py | 14 ++++++++++---- ffmpeg/tests/test_ffmpeg.py | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 83117e4..8e4af83 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -59,7 +59,7 @@ def _get_filter_spec(node, outgoing_edge_map, stream_name_map): outgoing_edges = get_outgoing_edges(node, outgoing_edge_map) inputs = [stream_name_map[edge.upstream_node, edge.upstream_label] for edge in incoming_edges] outputs = [stream_name_map[edge.upstream_node, edge.upstream_label] for edge in outgoing_edges] - filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(), ''.join(outputs)) + filter_spec = '{}{}{}'.format(''.join(inputs), node._get_filter(outgoing_edges), ''.join(outputs)) return filter_spec diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index 11b8f85..ca060be 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -148,11 +148,17 @@ class FilterNode(Node): kwargs=kwargs ) - def _get_filter(self): - params_text = self.name - arg_params = ['{}'.format(arg) for arg in self.args] - kwarg_params = ['{}={}'.format(k, self.kwargs[k]) for k in sorted(self.kwargs)] + def _get_filter(self, outgoing_edges): + args = self.args + kwargs = self.kwargs + if self.name == 'split': + args = [len(outgoing_edges)] + + arg_params = ['{}'.format(arg) for arg in args] + kwarg_params = ['{}={}'.format(k, kwargs[k]) for k in sorted(kwargs)] params = arg_params + kwarg_params + + params_text = self.name if params: params_text += '={}'.format(':'.join(params)) return params_text diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index 47f8f72..a0d1396 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -122,7 +122,7 @@ def test_get_args_complex_filter(): '-i', TEST_OVERLAY_FILE, '-filter_complex', '[0]vflip[s0];' \ - '[s0]split[s1][s2];' \ + '[s0]split=2[s1][s2];' \ '[s1]trim=end_frame=20:start_frame=10[s3];' \ '[s2]trim=end_frame=40:start_frame=30[s4];' \ '[s3][s4]concat=n=2[s5];' \ From 4640adabe0df764d22bfeaceb663f8327d2de9f1 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Wed, 12 Jul 2017 02:10:44 -0600 Subject: [PATCH 15/15] Futurize --- ffmpeg/_run.py | 2 +- ffmpeg/nodes.py | 4 ++-- ffmpeg/tests/test_ffmpeg.py | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 8e4af83..91a1462 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -67,7 +67,7 @@ def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_ stream_count = 0 for upstream_node in filter_nodes: outgoing_edge_map = outgoing_edge_maps[upstream_node] - for upstream_label, downstreams in outgoing_edge_map.items(): + for upstream_label, downstreams in list(outgoing_edge_map.items()): if len(downstreams) > 1: # TODO: automatically insert `splits` ahead of time via graph transformation. raise ValueError('Encountered {} with multiple outgoing edges with same upstream label {!r}; a ' diff --git a/ffmpeg/nodes.py b/ffmpeg/nodes.py index 5122ad0..2b4c94f 100644 --- a/ffmpeg/nodes.py +++ b/ffmpeg/nodes.py @@ -54,7 +54,7 @@ def get_stream_map(stream_spec): def get_stream_map_nodes(stream_map): nodes = [] - for stream in stream_map.values(): + for stream in list(stream_map.values()): if not isinstance(stream, Stream): raise TypeError('Expected Stream; got {}'.format(type(stream))) nodes.append(stream.node) @@ -157,7 +157,7 @@ class FilterNode(Node): out_args = [escape_chars(x, '\\\'=:') for x in args] out_kwargs = {} - for k, v in kwargs.items(): + for k, v in list(kwargs.items()): k = escape_chars(k, '\\\'=:') v = escape_chars(v, '\\\'=:') out_kwargs[k] = v diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index 11734cb..de4f2af 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -1,5 +1,7 @@ from __future__ import unicode_literals +from builtins import bytes +from builtins import range import ffmpeg import os import pytest @@ -171,7 +173,7 @@ def test_filter_normal_arg_escape(): '=': 2, '\n': 0, } - for ch, expected_backslash_count in expected_backslash_counts.items(): + for ch, expected_backslash_count in list(expected_backslash_counts.items()): expected = '{}{}'.format('\\' * expected_backslash_count, ch) actual = _get_drawtext_font_repr(ch) assert expected == actual @@ -205,7 +207,7 @@ def test_filter_text_arg_str_escape(): '=': 2, '\n': 0, } - for ch, expected_backslash_count in expected_backslash_counts.items(): + for ch, expected_backslash_count in list(expected_backslash_counts.items()): expected = '{}{}'.format('\\' * expected_backslash_count, ch) actual = _get_drawtext_text_repr(ch) assert expected == actual