Merge 34cc51d95d915fb2400e50c1c04e088520f41c75 into 7077eaad646e84004fbdeb052b034f9f68c15b5e

This commit is contained in:
Davide Depau 2018-05-09 06:31:19 +00:00 committed by GitHub
commit 5b564d7926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 5 deletions

View File

@ -76,3 +76,8 @@ def escape_chars(text, chars):
for ch in chars: for ch in chars:
text = text.replace(ch, '\\' + ch) text = text.replace(ch, '\\' + ch)
return text return text
def sort_ordereddict(d):
for key in sorted(d.keys()):
d[key] = d.pop(key)

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from ._utils import get_hash, get_hash_int from ._utils import get_hash, get_hash_int, sort_ordereddict
from builtins import object from builtins import object
from collections import namedtuple from collections import namedtuple, OrderedDict
class DagNode(object): class DagNode(object):
@ -162,7 +162,7 @@ def topo_sort(downstream_nodes):
raise RuntimeError('Graph is not a DAG') raise RuntimeError('Graph is not a DAG')
if downstream_node is not None: if downstream_node is not None:
outgoing_edge_map = outgoing_edge_maps.get(upstream_node, {}) outgoing_edge_map = outgoing_edge_maps.get(upstream_node, OrderedDict())
outgoing_edge_infos = outgoing_edge_map.get(upstream_label, []) outgoing_edge_infos = outgoing_edge_map.get(upstream_label, [])
outgoing_edge_infos += [(downstream_node, downstream_label, downstream_selector)] outgoing_edge_infos += [(downstream_node, downstream_label, downstream_selector)]
outgoing_edge_map[upstream_label] = outgoing_edge_infos outgoing_edge_map[upstream_label] = outgoing_edge_infos
@ -179,4 +179,9 @@ def topo_sort(downstream_nodes):
while unmarked_nodes: while unmarked_nodes:
upstream_node, upstream_label = unmarked_nodes.pop() upstream_node, upstream_label = unmarked_nodes.pop()
visit(upstream_node, upstream_label, None, None) visit(upstream_node, upstream_label, None, None)
# Sort outgoing edge maps by upstream label
for map in outgoing_edge_maps.values():
sort_ordereddict(map)
return sorted_nodes, outgoing_edge_maps return sorted_nodes, outgoing_edge_maps

View File

@ -1,5 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from collections import OrderedDict
from .dag import KwargReprNode from .dag import KwargReprNode
from ._utils import escape_chars, get_hash_int from ._utils import escape_chars, get_hash_int
from builtins import object from builtins import object
@ -69,7 +71,7 @@ def get_stream_map(stream_spec):
elif isinstance(stream_spec, Stream): elif isinstance(stream_spec, Stream):
stream_map = {None: stream_spec} stream_map = {None: stream_spec}
elif isinstance(stream_spec, (list, tuple)): elif isinstance(stream_spec, (list, tuple)):
stream_map = dict(enumerate(stream_spec)) stream_map = OrderedDict(enumerate(stream_spec))
elif isinstance(stream_spec, dict): elif isinstance(stream_spec, dict):
stream_map = stream_spec stream_map = stream_spec
return stream_map return stream_map
@ -108,7 +110,7 @@ class Node(KwargReprNode):
@classmethod @classmethod
def __get_incoming_edge_map(cls, stream_map): def __get_incoming_edge_map(cls, stream_map):
incoming_edge_map = {} incoming_edge_map = OrderedDict()
for downstream_label, upstream in list(stream_map.items()): for downstream_label, upstream in list(stream_map.items()):
incoming_edge_map[downstream_label] = (upstream.node, upstream.label, upstream.selector) incoming_edge_map[downstream_label] = (upstream.node, upstream.label, upstream.selector)
return incoming_edge_map return incoming_edge_map