Merge e8c51f2b20678c59f9dffdea7ec93cc652b4dc72 into 9c1661f3d74189bd96a9f2a7dbfb4c3ec90be934

This commit is contained in:
Davide Depau 2017-07-12 08:14:10 +00:00 committed by GitHub
commit e80ed5b4eb
2 changed files with 6 additions and 4 deletions

View File

@ -48,8 +48,8 @@ def filter_(stream_spec, filter_name, *args, **kwargs):
@filter_operator()
def split(stream):
return FilterNode(stream, split.__name__)
def split(stream, splits=None):
return FilterNode(stream, split.__name__, args=[splits] if splits else [],)
@filter_operator()

View File

@ -1,5 +1,7 @@
from __future__ import unicode_literals
import warnings
from .dag import get_outgoing_edges, topo_sort
from functools import reduce
from past.builtins import basestring
@ -70,8 +72,8 @@ def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_
for upstream_label, downstreams in list(outgoing_edge_map.items()):
if len(downstreams) > 1:
# TODO: automatically insert `splits` ahead of time via graph transformation.
raise ValueError('Encountered {} with multiple outgoing edges with same upstream label {!r}; a '
'`split` filter is probably required'.format(upstream_node, upstream_label))
warnings.warn('Encountered {} with multiple outgoing edges with same upstream label {!r}; a '
'`split` filter is probably required'.format(upstream_node, upstream_label), RuntimeWarning)
stream_name_map[upstream_node, upstream_label] = _get_stream_name('s{}'.format(stream_count))
stream_count += 1