diff --git a/.travis.yml b/.travis.yml index d8657a8f..3350051a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ python: - "2.7" - "3.4" - "3.5" + - "3.6" install: - pip install Sphinx sphinx_rtd_theme codecov packaging diff --git a/graphkit/base.py b/graphkit/base.py index 1c04e8d5..2e036468 100644 --- a/graphkit/base.py +++ b/graphkit/base.py @@ -1,5 +1,10 @@ # Copyright 2016, Yahoo Inc. # Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +try: + from collections import abc +except ImportError: + import collections as abc + class Data(object): """ @@ -151,9 +156,12 @@ def __init__(self, **kwargs): # set execution mode to single-threaded sequential by default self._execution_method = "sequential" + self._overwrites_collector = None def _compute(self, named_inputs, outputs=None): - return self.net.compute(outputs, named_inputs, method=self._execution_method) + return self.net.compute( + outputs, named_inputs, method=self._execution_method, + overwrites_collector=self._overwrites_collector) def __call__(self, *args, **kwargs): return self._compute(*args, **kwargs) @@ -162,15 +170,35 @@ def set_execution_method(self, method): """ Determine how the network will be executed. - Args: - method: str - If "parallel", execute graph operations concurrently - using a threadpool. + :param str method: + If "parallel", execute graph operations concurrently + using a threadpool. """ - options = ['parallel', 'sequential'] - assert method in options + choices = ['parallel', 'sequential'] + if method not in choices: + raise ValueError( + "Invalid computation method %r! Must be one of %s" + (method, choices)) self._execution_method = method + def set_overwrites_collector(self, collector): + """ + Asks to put all *overwrites* into the `collector` after computing + + An "overwrites" is intermediate value calculated but NOT stored + into the results, becaues it has been given also as an intemediate + input value, and the operation that would overwrite it MUST run for + its other results. + + :param collector: + a mutable dict to be fillwed with named values + """ + if collector is not None and not isinstance(collector, abc.MutableMapping): + raise ValueError( + "Overwrites collector was not a MutableMapping, but: %r" + % collector) + self._overwrites_collector = collector + def plot(self, filename=None, show=False): self.net.plot(filename=filename, show=show) diff --git a/graphkit/functional.py b/graphkit/functional.py index 65388973..c113a298 100644 --- a/graphkit/functional.py +++ b/graphkit/functional.py @@ -3,6 +3,8 @@ from itertools import chain +from boltons.setutils import IndexedSet as iset + from .base import Operation, NetworkOperation from .network import Network from .modifiers import optional @@ -28,7 +30,7 @@ def _compute(self, named_inputs, outputs=None): result = zip(self.provides, result) if outputs: - outputs = set(outputs) + outputs = sorted(set(outputs)) result = filter(lambda x: x[0] in outputs, result) return dict(result) @@ -185,27 +187,29 @@ def __call__(self, *operations): # If merge is desired, deduplicate operations before building network if self.merge: - merge_set = set() + merge_set = iset() # Preseve given node order. for op in operations: if isinstance(op, NetworkOperation): - net_ops = filter(lambda x: isinstance(x, Operation), op.net.steps) + op.net.compile() + net_ops = filter(lambda x: isinstance(x, Operation), + op.net.execution_plan) merge_set.update(net_ops) else: merge_set.add(op) - operations = list(merge_set) + operations = merge_set def order_preserving_uniquifier(seq, seen=None): - seen = seen if seen else set() + seen = seen if seen else set() # unordered, not iterated seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))] provides = order_preserving_uniquifier(chain(*[op.provides for op in operations])) - needs = order_preserving_uniquifier(chain(*[op.needs for op in operations]), set(provides)) + needs = order_preserving_uniquifier(chain(*[op.needs for op in operations]), + set(provides)) # unordered, not iterated - # compile network + # Build network net = Network() for op in operations: net.add_op(op) - net.compile() return NetworkOperation(name=self.name, needs=needs, provides=provides, params={}, net=net) diff --git a/graphkit/network.py b/graphkit/network.py index 0df3ddf8..e8240056 100644 --- a/graphkit/network.py +++ b/graphkit/network.py @@ -1,19 +1,41 @@ # Copyright 2016, Yahoo Inc. # Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. - -import time +"""" The main implementation of the network of operations & data to compute. """ import os +import sys +import time import networkx as nx +from collections import defaultdict from io import StringIO +from itertools import chain + + +from boltons.setutils import IndexedSet as iset from .base import Operation +from .modifiers import optional + + + +from networkx import DiGraph +if sys.version_info < (3, 6): + """ + Consistently ordered variant of :class:`~networkx.DiGraph`. + + PY3.6 has inmsertion-order dicts, but PY3.5 has not. + And behvavior *and TCs) in these environments may fail spuriously! + Still *subgraphs* may not patch! + + Fix from: + https://networkx.github.io/documentation/latest/reference/classes/ordered.html#module-networkx.classes.ordered + """ + from networkx import OrderedDiGraph as DiGraph class DataPlaceholderNode(str): """ - A node for the Network graph that describes the name of a Data instance - produced or required by a layer. + Dag node naming a data-value produced or required by an operation. """ def __repr__(self): return 'DataPlaceholderNode("%s")' % self @@ -21,18 +43,94 @@ def __repr__(self): class DeleteInstruction(str): """ - An instruction for the compiled list of evaluation steps to free or delete - a Data instance from the Network's cache after it is no longer needed. + Execution step to delete a computed value from the network's ``cache``. + + It is an :attr:`Network.execution_plan` step for the data-node `str` that + frees its data-value from ``cache`` after it is no longer needed, + to reduce memory footprint while computing the pipeline. """ def __repr__(self): return 'DeleteInstruction("%s")' % self +class PinInstruction(str): + """ + Execution step to replace a computed value in the ``cache`` from the inputs, + + and to store the computed one in the ``overwrites`` instead + (both ``cache`` & ``overwrites`` are local-vars in :meth:`Network.compute()`). + + It is an :attr:`Network.execution_plan` step for the data-node `str` that + ensures the corresponding intermediate input-value is not overwritten when + its providing function(s) could not be pruned, because their other outputs + are needed elesewhere. + """ + def __repr__(self): + return 'PinInstruction("%s")' % self + + class Network(object): """ - This is the main network implementation. The class contains all of the - code necessary to weave together operations into a directed-acyclic-graph (DAG) - and pass data through. + Assemble operations & data into a directed-acyclic-graph (DAG) to run them. + + The execution of the contained *operations* in the dag (the computation) + is splitted in 2 phases: + + - COMPILE: prune unsatisfied nodes, sort dag topologically & solve it, and + derive the *execution plan* (see below) based on the given *inputs* + and asked *outputs*. + + - EXECUTE: sequential or parallel invocation of the underlying functions + of the operations with arguments from the ``cache``. + + is based on 5 data-structures: + + :ivar graph: + A ``networkx`` DAG containing interchanging layers of + :class:`Operation` and :class:`DataPlaceholderNode` nodes. + They are layed out and connected by repeated calls of :meth:`add_OP`. + + The computation starts with :meth:`_prune_dag()` extracting + a *DAG subgraph* by *pruning* its nodes based on given inputs and + requested outputs in :meth:`compute()`. + :ivar execution_dag: + It contains the nodes of the *pruned dag* from the last call to + :meth:`compile()`. This pruned subgraph is used to decide + the :attr:`execution_plan` (below). + It is cached in :attr:`_cached_compilations` across runs with + inputs/outputs as key. + + :ivar execution_plan: + It is the list of the operation-nodes only + from the dag (above), topologically sorted, and interspersed with + *instructions steps* needed to complete the run. + It is built by :meth:`_build_execution_plan()` based on the subgraph dag + extracted above. + It is cached in :attr:`_cached_compilations` across runs with + inputs/outputs as key. + + The *instructions* items achieve the following: + + - :class:`DeleteInstruction`: delete items from values-cache as soon as + they are not needed further down the dag, to reduce memory footprint + while computing. + + - :class:`PinInstruction`: avoid overwritting any given intermediate + inputs, and still allow their providing operations to run + (because they are needed for their other outputs). + + :var cache: + a local-var in :meth:`compute()`, initialized on each run + to hold the values of the given inputs, generated (intermediate) data, + and output values. + It is returned as is if no specific outputs requested; no data-eviction + happens then. + + :arg overwrites: + The optional argument given to :meth:`compute()` to colect the + intermediate *calculated* values that are overwritten by intermediate + (aka "pinned") input-values. + """ def __init__(self, **kwargs): @@ -40,27 +138,30 @@ def __init__(self, **kwargs): """ # directed graph of layer instances and data-names defining the net. - self.graph = nx.DiGraph() + self.graph = DiGraph() self._debug = kwargs.get("debug", False) # this holds the timing information for eache layer self.times = {} - # a compiled list of steps to evaluate layers *in order* and free mem. - self.steps = [] + #: The list of operation-nodes & *instructions* needed to evaluate + #: the given inputs & asked outputs, free memory and avoid overwritting + #: any given intermediate inputs. + self.execution_plan = () + + #: Pruned graph of the last compilation. + self.execution_dag = () - # This holds a cache of results for the _find_necessary_steps - # function, this helps speed up the compute call as well avoid - # a multithreading issue that is occuring when accessing the - # graph in networkx - self._necessary_steps_cache = {} + #: Speed up :meth:`compile()` call and avoid a multithreading issue(?) + #: that is occuring when accessing the dag in networkx. + self._cached_compilations = {} def add_op(self, operation): """ Adds the given operation and its data requirements to the network graph - based on the name of the operation, the names of the operation's needs, and - the names of the data it provides. + based on the name of the operation, the names of the operation's needs, + and the names of the data it provides. :param Operation operation: Operation object to add. """ @@ -71,7 +172,11 @@ def add_op(self, operation): assert operation.provides is not None, "Operation's 'provides' must be named" # assert layer is only added once to graph - assert operation not in self.graph.nodes(), "Operation may only be added once" + assert operation not in self.graph.nodes, "Operation may only be added once" + + self.execution_dag = None + self.execution_plan = None + self._cached_compilations = {} # add nodes and edges to graph describing the data needs for this layer for n in operation.needs: @@ -81,142 +186,245 @@ def add_op(self, operation): for p in operation.provides: self.graph.add_edge(operation, DataPlaceholderNode(p)) - # clear compiled steps (must recompile after adding new layers) - self.steps = [] + def list_layers(self, debug=False): + # Make a generic plan. + plan = self._build_execution_plan(self.graph, ()) + return [n for n in plan if debug or isinstance(n, Operation)] - def list_layers(self): - assert self.steps, "network must be compiled before listing layers." - return [(s.name, s) for s in self.steps if isinstance(s, Operation)] + def show_layers(self, debug=False, ret=False): + """Shows info (name, needs, and provides) about all operations in this dag.""" + s = "\n".join(repr(n) for n in self.list_layers(debug=debug)) + if ret: + return s + else: + print(s) + + def _build_execution_plan(self, dag, inputs, outputs): + """ + Create the list of operation-nodes & *instructions* evaluating all - def show_layers(self): - """Shows info (name, needs, and provides) about all layers in this network.""" - for name, step in self.list_layers(): - print("layer_name: ", name) - print("\t", "needs: ", step.needs) - print("\t", "provides: ", step.provides) - print("") + operations & instructions needed a) to free memory and b) avoid + overwritting given intermediate inputs. + :param dag: + The original dag, pruned; not broken. + :param outputs: + outp-names to decide whether to add (and which) del-instructions - def compile(self): - """Create a set of steps for evaluating layers - and freeing memory as necessary""" + In the list :class:`DeleteInstructions` steps (DA) are inserted between + operation nodes to reduce the memory footprint of cached results. + A DA is inserted whenever a *need* is not used by any other *operation* + further down the DAG. + Note that since the *cache* is not reused across `compute()` invocations, + any memory-reductions are for as long as a single computation runs. - # clear compiled steps - self.steps = [] + """ + + plan = [] # create an execution order such that each layer's needs are provided. - ordered_nodes = list(nx.dag.topological_sort(self.graph)) + ordered_nodes = iset(nx.topological_sort(dag)) - # add Operations evaluation steps, and instructions to free data. + # Add Operations evaluation steps, and instructions to free and "pin" + # data. for i, node in enumerate(ordered_nodes): if isinstance(node, DataPlaceholderNode): - continue + if node in inputs and dag.pred[node]: + # Command pinning only when there is another operation + # generating this data as output. + plan.append(PinInstruction(node)) elif isinstance(node, Operation): + plan.append(node) - # add layer to list of steps - self.steps.append(node) + # Keep all values in cache if not specific outputs asked. + if not outputs: + continue # Add instructions to delete predecessors as possible. A # predecessor may be deleted if it is a data placeholder that # is no longer needed by future Operations. - for predecessor in self.graph.predecessors(node): + for need in self.graph.pred[node]: if self._debug: - print("checking if node %s can be deleted" % predecessor) - predecessor_still_needed = False + print("checking if node %s can be deleted" % need) for future_node in ordered_nodes[i+1:]: - if isinstance(future_node, Operation): - if predecessor in future_node.needs: - predecessor_still_needed = True - break - if not predecessor_still_needed: - if self._debug: - print(" adding delete instruction for %s" % predecessor) - self.steps.append(DeleteInstruction(predecessor)) + if ( + isinstance(future_node, Operation) + and need in future_node.needs + ): + break + else: + if need not in outputs: + if self._debug: + print(" adding delete instruction for %s" % need) + plan.append(DeleteInstruction(need)) else: - raise TypeError("Unrecognized network graph node") + raise AssertionError("Unrecognized network graph node %r" % node) + return plan - def _find_necessary_steps(self, outputs, inputs): + def _collect_unsatisfied_operations(self, dag, inputs): """ - Determines what graph steps need to pe run to get to the requested - outputs from the provided inputs. Eliminates steps that come before - (in topological order) any inputs that have been provided. Also - eliminates steps that are not on a path from the provided inputs to - the requested outputs. + Traverse topologically sorted dag to collect un-satisfied operations. + + Unsatisfied operations are those suffering from ANY of the following: + + - They are missing at least one compulsory need-input. + Since the dag is ordered, as soon as we're on an operation, + all its needs have been accounted, so we can get its satisfaction. + + - Their provided outputs are not linked to any data in the dag. + An operation might not have any output link when :meth:`_prune_dag()` + has broken them, due to given intermediate inputs. - :param list outputs: + :param dag: + a graph with broken edges those arriving to existing inputs + :param inputs: + an iterable of the names of the input values + return: + a list of unsatisfied operations to prune + """ + # To collect data that will be produced. + ok_data = set(inputs) + # To colect the map of operations --> satisfied-needs. + op_satisfaction = defaultdict(set) + # To collect the operations to drop. + unsatisfied = [] + for node in nx.topological_sort(dag): + if isinstance(node, Operation): + if not dag.adj[node]: + # Prune operations that ended up providing no output. + unsatisfied.append(node) + else: + real_needs = set(n for n in node.needs + if not isinstance(n, optional)) + if real_needs.issubset(op_satisfaction[node]): + # We have a satisfied operation; mark its output-data + # as ok. + ok_data.update(dag.adj[node]) + else: + # Prune operations with partial inputs. + unsatisfied.append(node) + elif isinstance(node, (DataPlaceholderNode, str)): # `str` are givens + if node in ok_data: + # mark satisfied-needs on all future operations + for future_op in dag.adj[node]: + op_satisfaction[future_op].add(node) + else: + raise AssertionError("Unrecognized network graph node %r" % node) + + return unsatisfied + + + def _prune_dag(self, outputs, inputs): + """ + Determines what graph steps need to run to get to the requested + outputs from the provided inputs. : + - Eliminate steps that are not on a path arriving to requested outputs. + - Eliminate unsatisfied operations: partial inputs or no outputs needed. + + :param iterable outputs: A list of desired output names. This can also be ``None``, in which case the necessary steps are all graph nodes that are reachable from one of the provided inputs. - :param dict inputs: - A dictionary mapping names to values for all provided inputs. + :param iterable inputs: + The inputs names of all given inputs. - :returns: - Returns a list of all the steps that need to be run for the - provided inputs and requested outputs. + :return: + the *pruned_dag* """ + dag = self.graph + + # Ignore input names that aren't in the graph. + graph_inputs = iset(dag.nodes) & inputs # preserve order + + # Scream if some requested outputs aren't in the graph. + unknown_outputs = iset(outputs) - dag.nodes + if unknown_outputs: + raise ValueError( + "Unknown output node(s) requested: %s" + % ", ".join(unknown_outputs)) + + broken_dag = dag.copy() # preserve net's graph + + # Break the incoming edges to all given inputs. + # + # Nodes producing any given intermediate inputs are unecessary + # (unless they are also used elsewhere). + # To discover which ones to prune, we break their incoming edges + # and they will drop out while collecting ancestors from the outputs. + for given in graph_inputs: + broken_dag.remove_edges_from(list(broken_dag.in_edges(given))) + + if outputs: + # If caller requested specific outputs, we can prune any + # unrelated nodes further up the dag. + ending_in_outputs = set() + for output_name in outputs: + ending_in_outputs.add(DataPlaceholderNode(output_name)) + ending_in_outputs.update(nx.ancestors(dag, output_name)) + broken_dag = broken_dag.subgraph(ending_in_outputs) - # return steps if it has already been computed before for this set of inputs and outputs - outputs = tuple(sorted(outputs)) if isinstance(outputs, (list, set)) else outputs - inputs_keys = tuple(sorted(inputs.keys())) - cache_key = (inputs_keys, outputs) - if cache_key in self._necessary_steps_cache: - return self._necessary_steps_cache[cache_key] - graph = self.graph - if not outputs: + # Prune unsatisfied operations (those with partial inputs or no outputs). + unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs) + # Clone it so that it is picklable. + pruned_dag = dag.subgraph(broken_dag.nodes - unsatisfied) - # If caller requested all outputs, the necessary nodes are all - # nodes that are reachable from one of the inputs. Ignore input - # names that aren't in the graph. - necessary_nodes = set() - for input_name in iter(inputs): - if graph.has_node(input_name): - necessary_nodes |= nx.descendants(graph, input_name) + return pruned_dag.copy() # clone so that it is picklable - else: + assert all( + isinstance(n, (Operation, DataPlaceholderNode)) for n in pruned_dag + ), pruned_dag - # If the caller requested a subset of outputs, find any nodes that - # are made unecessary because we were provided with an input that's - # deeper into the network graph. Ignore input names that aren't - # in the graph. - unnecessary_nodes = set() - for input_name in iter(inputs): - if graph.has_node(input_name): - unnecessary_nodes |= nx.ancestors(graph, input_name) - - # Find the nodes we need to be able to compute the requested - # outputs. Raise an exception if a requested output doesn't - # exist in the graph. - necessary_nodes = set() - for output_name in outputs: - if not graph.has_node(output_name): - raise ValueError("graphkit graph does not have an output " - "node named %s" % output_name) - necessary_nodes |= nx.ancestors(graph, output_name) + return pruned_dag.copy() + + def compile(self, outputs=(), inputs=()): + """ + Solve dag, set the :attr:`execution_plan`, and cache it. + + See :meth:`_prune_dag()` for detailed description. + + :param iterable outputs: + A list of desired output names. This can also be ``None``, in which + case the necessary steps are all graph nodes that are reachable + from one of the provided inputs. - # Get rid of the unnecessary nodes from the set of necessary ones. - necessary_nodes -= unnecessary_nodes + :param dict inputs: + The input names of all given inputs. + """ + # return steps if it has already been computed before for this set of inputs and outputs + if outputs is not None and not isinstance(outputs, str): + outputs = tuple(sorted(outputs)) + inputs_keys = tuple(sorted(inputs)) + cache_key = (inputs_keys, outputs) + + if cache_key in self._cached_compilations: + dag, plan = self._cached_compilations[cache_key] + else: + dag = self._prune_dag(outputs, inputs) + plan = self._build_execution_plan(dag, inputs, outputs) - necessary_steps = [step for step in self.steps if step in necessary_nodes] + # Cache compilation results to speed up future runs + # with different values (but same number of inputs/outputs). + self._cached_compilations[cache_key] = dag, plan - # save this result in a precomputed cache for future lookup - self._necessary_steps_cache[cache_key] = necessary_steps + ## TODO: Extract into Solution class + self.execution_dag = dag + self.execution_plan = plan - # Return an ordered list of the needed steps. - return necessary_steps - def compute(self, outputs, named_inputs, method=None): + def compute( + self, outputs, named_inputs, method=None, overwrites_collector=None): """ - Run the graph. Any inputs to the network must be passed in by name. + Solve & execute the graph, sequentially or parallel. :param list output: The names of the data node you'd like to have returned once all necessary computations are complete. @@ -228,27 +436,58 @@ def compute(self, outputs, named_inputs, method=None): and the values are the concrete values you want to set for the data node. + :param method: + if ``"parallel"``, launches multi-threading. + Set when invoking a composed graph or by + :meth:`~NetworkOperation.set_execution_method()`. + + :param overwrites_collector: + (optional) a mutable dict to be fillwed with named values. + If missing, values are simply discarded. :returns: a dictionary of output data objects, keyed by name. """ - # assert that network has been compiled - assert self.steps, "network must be compiled before calling compute." assert isinstance(outputs, (list, tuple)) or outputs is None,\ "The outputs argument must be a list" + # start with fresh data cache & overwrites + cache = named_inputs.copy() + + # Build and set :attr:`execution_plan`. + self.compile(outputs, named_inputs.keys()) # choose a method of execution if method == "parallel": - return self._compute_thread_pool_barrier_method(named_inputs, - outputs) + self._execute_thread_pool_barrier_method( + cache, overwrites_collector, named_inputs) else: - return self._compute_sequential_method(named_inputs, - outputs) + self._execute_sequential_method( + cache, overwrites_collector, named_inputs) + if not outputs: + # Return the whole cache as output, including input and + # intermediate data nodes. + result = cache - def _compute_thread_pool_barrier_method(self, named_inputs, outputs, - thread_pool_size=10): + else: + # Filter outputs to just return what's needed. + # Note: list comprehensions exist in python 2.7+ + result = dict(i for i in cache.items() if i[0] in outputs) + + return result + + + def _pin_data_in_cache(self, value_name, cache, inputs, overwrites): + value_name = str(value_name) + if overwrites is not None: + overwrites[value_name] = cache[value_name] + cache[value_name] = inputs[value_name] + + + def _execute_thread_pool_barrier_method( + self, cache, overwrites, inputs, thread_pool_size=10 + ): """ This method runs the graph using a parallel pool of thread executors. You may achieve lower total latency if your graph is sufficiently @@ -261,12 +500,9 @@ def _compute_thread_pool_barrier_method(self, named_inputs, outputs, self._thread_pool = Pool(thread_pool_size) pool = self._thread_pool - cache = {} - cache.update(named_inputs) - necessary_nodes = self._find_necessary_steps(outputs, named_inputs) # this keeps track of all nodes that have already executed - has_executed = set() + executed_nodes = set() # unordered, not iterated # with each loop iteration, we determine a set of operations that can be # scheduled, then schedule them onto a thread pool, then collect their @@ -276,22 +512,30 @@ def _compute_thread_pool_barrier_method(self, named_inputs, outputs, # the upnext list contains a list of operations for scheduling # in the current round of scheduling upnext = [] - for node in necessary_nodes: - # only delete if all successors for the data node have been executed - if isinstance(node, DeleteInstruction): - if ready_to_delete_data_node(node, - has_executed, - self.graph): - if node in cache: - cache.pop(node) - - # continue if this node is anything but an operation node - if not isinstance(node, Operation): - continue - - if ready_to_schedule_operation(node, has_executed, self.graph) \ - and node not in has_executed: + for node in self.execution_plan: + if ( + isinstance(node, Operation) + and self._can_schedule_operation(node, executed_nodes) + and node not in executed_nodes + ): upnext.append(node) + elif isinstance(node, DeleteInstruction): + # Only delete if all successors for the data node + # have been executed. + # An optional need may not have a value in the cache. + if ( + node in cache + and self._can_evict_value(node, executed_nodes) + ): + if self._debug: + print("removing data '%s' from cache." % node) + del cache[node] + elif isinstance(node, PinInstruction): + # Always and repeatedely pin the value, even if not all + # providers of the data have executed. + # An optional need may not have a value in the cache. + if node in cache: + self._pin_data_in_cache(node, cache, inputs, overwrites) # stop if no nodes left to schedule, exit out of the loop @@ -303,29 +547,15 @@ def _compute_thread_pool_barrier_method(self, named_inputs, outputs, upnext) for op, result in done_iterator: cache.update(result) - has_executed.add(op) + executed_nodes.add(op) - if not outputs: - return cache - else: - return {k: cache[k] for k in iter(cache) if k in outputs} - def _compute_sequential_method(self, named_inputs, outputs): + def _execute_sequential_method(self, cache, overwrites, inputs): """ This method runs the graph one operation at a time in a single thread """ - # start with fresh data cache - cache = {} - - # add inputs to data cache - cache.update(named_inputs) - - # Find the subset of steps we need to run to get to the requested - # outputs from the provided inputs. - all_steps = self._find_necessary_steps(outputs, named_inputs) - self.times = {} - for step in all_steps: + for step in self.execution_plan: if isinstance(step, Operation): @@ -348,31 +578,17 @@ def _compute_sequential_method(self, named_inputs, outputs): if self._debug: print("step completion time: %s" % t_complete) - # Process DeleteInstructions by deleting the corresponding data - # if possible. elif isinstance(step, DeleteInstruction): + # Cache value may be missing if it is optional. + if step in cache: + if self._debug: + print("removing data '%s' from cache." % step) + del cache[step] - if outputs and step not in outputs: - # Some DeleteInstruction steps may not exist in the cache - # if they come from optional() needs that are not privoded - # as inputs. Make sure the step exists before deleting. - if step in cache: - if self._debug: - print("removing data '%s' from cache." % step) - cache.pop(step) - + elif isinstance(step, PinInstruction): + self._pin_data_in_cache(step, cache, inputs, overwrites) else: - raise TypeError("Unrecognized instruction.") - - if not outputs: - # Return the whole cache as output, including input and - # intermediate data nodes. - return cache - - else: - # Filter outputs to just return what's needed. - # Note: list comprehensions exist in python 2.7+ - return {k: cache[k] for k in iter(cache) if k in outputs} + raise AssertionError("Unrecognized instruction.%r" % step) def plot(self, filename=None, show=False): @@ -406,7 +622,7 @@ def get_node_name(a): g = pydot.Dot(graph_type="digraph") # draw nodes - for nx_node in self.graph.nodes(): + for nx_node in self.graph.nodes: if isinstance(nx_node, DataPlaceholderNode): node = pydot.Node(name=nx_node, shape="rect") else: @@ -422,8 +638,8 @@ def get_node_name(a): # save plot if filename: - basename, ext = os.path.splitext(filename) - with open(filename, "w") as fh: + _basename, ext = os.path.splitext(filename) + with open(filename, "wb") as fh: if ext.lower() == ".png": fh.write(g.create_png()) elif ext.lower() == ".dot": @@ -448,49 +664,45 @@ def get_node_name(a): return g -def ready_to_schedule_operation(op, has_executed, graph): - """ - Determines if a Operation is ready to be scheduled for execution based on - what has already been executed. + def _can_schedule_operation(self, op, executed_nodes): + """ + Determines if a Operation is ready to be scheduled for execution + + based on what has already been executed. - Args: - op: + :param op: The Operation object to check - has_executed: set + :param set executed_nodes A set containing all operations that have been executed so far - graph: - The networkx graph containing the operations and data nodes - Returns: - A boolean indicating whether the operation may be scheduled for - execution based on what has already been executed. - """ - dependencies = set(filter(lambda v: isinstance(v, Operation), - nx.ancestors(graph, op))) - return dependencies.issubset(has_executed) + :return: + A boolean indicating whether the operation may be scheduled for + execution based on what has already been executed. + """ + # unordered, not iterated + dependencies = set(n for n in nx.ancestors(self.execution_dag, op) + if isinstance(n, Operation)) + return dependencies.issubset(executed_nodes) -def ready_to_delete_data_node(name, has_executed, graph): - """ - Determines if a DataPlaceholderNode is ready to be deleted from the - cache. - Args: - name: + def _can_evict_value(self, name, executed_nodes): + """ + Determines if a DataPlaceholderNode is ready to be deleted from cache. + + :param name: The name of the data node to check - has_executed: set + :param executed_nodes: set A set containing all operations that have been executed so far - graph: - The networkx graph containing the operations and data nodes - Returns: - A boolean indicating whether the data node can be deleted or not. - """ - data_node = get_data_node(name, graph) - return set(graph.successors(data_node)).issubset(has_executed) + :return: + A boolean indicating whether the data node can be deleted or not. + """ + data_node = self.get_data_node(name) + return data_node and set( + self.execution_dag.successors(data_node)).issubset(executed_nodes) -def get_data_node(name, graph): - """ - Gets a data node from a graph using its name - """ - for node in graph.nodes(): - if node == name and isinstance(node, DataPlaceholderNode): - return node - return None + def get_data_node(self, name): + """ + Retuen the data node from a graph using its name, or None. + """ + for node in self.graph.nodes: + if node == name and isinstance(node, DataPlaceholderNode): + return node diff --git a/setup.py b/setup.py index bd7883f4..46a69077 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,11 @@ author_email='huyng@yahoo-inc.com', url='http://github.com/yahoo/graphkit', packages=['graphkit'], - install_requires=['networkx'], + install_requires=[ + "networkx; python_version >= '3.5'", + "networkx == 2.2; python_version < '3.5'", + "boltons" # for IndexSet + ], extras_require={ 'plot': ['pydot', 'matplotlib'] }, diff --git a/test/test_graphkit.py b/test/test_graphkit.py index bd97b317..b7bb329b 100644 --- a/test/test_graphkit.py +++ b/test/test_graphkit.py @@ -5,12 +5,33 @@ import pickle from pprint import pprint -from operator import add +from operator import add, sub, floordiv, mul from numpy.testing import assert_raises import graphkit.network as network import graphkit.modifiers as modifiers from graphkit import operation, compose, Operation +from graphkit.network import DeleteInstruction + + +def scream(*args, **kwargs): + raise AssertionError( + "Must not have run!\n args: %s\n kwargs: %s", (args, kwargs)) + + +def identity(x): + return x + + +def filtdict(d, *keys): + """ + Keep dict items with the given keys + + >>> filtdict({"a": 1, "b": 2}, "b") + {"b": 2} + """ + return type(d)(i for i in d.items() if i[0] in keys) + def test_network(): @@ -184,6 +205,222 @@ def test_pruning_raises_for_bad_output(): outputs=['sum1', 'sum3', 'sum4']) +def test_pruning_not_overrides_given_intermediate(): + # Test #25: v1.2.4 overwrites intermediate data when no output asked + pipeline = compose(name="pipeline")( + operation(name="unjustly run", needs=["a"], provides=["overriden"])(scream), + operation(name="op", needs=["overriden", "c"], provides=["asked"])(add), + ) + + exp = {"a": 5, "overriden": 1, "c": 2, "asked": 3} + # v1.2.4.ok + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + # FAILs + # - on v1.2.4 with (overriden, asked): = (5, 7) instead of (1, 3) + # - on #18(unsatisfied) + #23(ordered-sets) with (overriden, asked) = (5, 7) instead of (1, 3) + # FIXED on #26 + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + + ## Test OVERWITES + # + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {} # unjust must have been pruned + + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + assert overwrites == {} # unjust must have been pruned + + ## Test Parallel + # + pipeline.set_execution_method("parallel") + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + #assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {} # unjust must have been pruned + + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + assert overwrites == {} # unjust must have been pruned + + +def test_pruning_multiouts_not_override_intermediates1(): + # Test #25: v.1.2.4 overwrites intermediate data when a previous operation + # must run for its other outputs (outputs asked or not) + pipeline = compose(name="pipeline")( + operation(name="must run", needs=["a"], provides=["overriden", "calced"]) + (lambda x: (x, 2 * x)), + operation(name="add", needs=["overriden", "calced"], provides=["asked"])(add), + ) + + exp = {"a": 5, "overriden": 1, "calced": 10, "asked": 11} + # FAILs + # - on v1.2.4 with (overriden, asked) = (5, 15) instead of (1, 11) + # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. + # FIXED on #26 + assert pipeline({"a": 5, "overriden": 1}) == exp + # FAILs + # - on v1.2.4 with KeyError: 'e', + # - on #18(unsatisfied) + #23(ordered-sets) with empty result. + # FIXED on #26 + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + + ## Test OVERWITES + # + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1}) == exp + assert overwrites == {'overriden': 5} + + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {'overriden': 5} + + ## Test parallel + # + pipeline.set_execution_method("parallel") + assert pipeline({"a": 5, "overriden": 1}) == exp + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + + +def test_pruning_multiouts_not_override_intermediates2(): + # Test #25: v.1.2.4 overrides intermediate data when a previous operation + # must run for its other outputs (outputs asked or not) + pipeline = compose(name="pipeline")( + operation(name="must run", needs=["a"], provides=["overriden", "e"]) + (lambda x: (x, 2 * x)), + operation(name="op1", needs=["overriden", "c"], provides=["d"])(add), + operation(name="op2", needs=["d", "e"], provides=["asked"])(mul), + ) + + exp = {"a": 5, "overriden": 1, "c": 2, "d": 3, "e": 10, "asked": 30} + # FAILs + # - on v1.2.4 with (overriden, asked) = (5, 70) instead of (1, 13) + # - on #18(unsatisfied) + #23(ordered-sets) like v1.2.4. + # FIXED on #26 + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + # FAILs + # - on v1.2.4 with KeyError: 'e', + # - on #18(unsatisfied) + #23(ordered-sets) with empty result. + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + # FIXED on #26 + + ## Test OVERWITES + # + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + assert overwrites == {'overriden': 5} + + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {'overriden': 5} + + ## Test parallel + # + pipeline.set_execution_method("parallel") + assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp + assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked") + + +def test_pruning_with_given_intermediate_and_asked_out(): + # Test #24: v1.2.4 does not prune before given intermediate data when + # outputs not asked, but does so when output asked. + pipeline = compose(name="pipeline")( + operation(name="unjustly pruned", needs=["given-1"], provides=["a"])(identity), + operation(name="shortcuted", needs=["a", "b"], provides=["given-2"])(add), + operation(name="good_op", needs=["a", "given-2"], provides=["asked"])(add), + ) + + exp = {"given-1": 5, "b": 2, "given-2": 2, "a": 5, "asked": 7} + # v1.2.4 is ok + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp + # FAILS + # - on v1.2.4 with KeyError: 'a', + # - on #18 (unsatisfied) with no result. + # FIXED on #18+#26 (new dag solver). + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + + ## Test OVERWITES + # + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp + assert overwrites == {} + + overwrites = {} + pipeline.set_overwrites_collector(overwrites) + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + assert overwrites == {} + + ## Test parallel + # FAIL! in #26! + # + pipeline.set_execution_method("parallel") + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp + assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked") + +def test_unsatisfied_operations(): + # Test that operations with partial inputs are culled and not failing. + pipeline = compose(name="pipeline")( + operation(name="add", needs=["a", "b1"], provides=["a+b1"])(add), + operation(name="sub", needs=["a", "b2"], provides=["a-b2"])(sub), + ) + + exp = {"a": 10, "b1": 2, "a+b1": 12} + assert pipeline({"a": 10, "b1": 2}) == exp + assert pipeline({"a": 10, "b1": 2}, outputs=["a+b1"]) == filtdict(exp, "a+b1") + + exp = {"a": 10, "b2": 2, "a-b2": 8} + assert pipeline({"a": 10, "b2": 2}) == exp + assert pipeline({"a": 10, "b2": 2}, outputs=["a-b2"]) == filtdict(exp, "a-b2") + + ## Test parallel + # + pipeline.set_execution_method("parallel") + exp = {"a": 10, "b1": 2, "a+b1": 12} + assert pipeline({"a": 10, "b1": 2}) == exp + assert pipeline({"a": 10, "b1": 2}, outputs=["a+b1"]) == filtdict(exp, "a+b1") + + exp = {"a": 10, "b2": 2, "a-b2": 8} + assert pipeline({"a": 10, "b2": 2}) == exp + assert pipeline({"a": 10, "b2": 2}, outputs=["a-b2"]) == filtdict(exp, "a-b2") + +def test_unsatisfied_operations_same_out(): + # Test unsatisfied pairs of operations providing the same output. + pipeline = compose(name="pipeline")( + operation(name="mul", needs=["a", "b1"], provides=["ab"])(mul), + operation(name="div", needs=["a", "b2"], provides=["ab"])(floordiv), + operation(name="add", needs=["ab", "c"], provides=["ab_plus_c"])(add), + ) + + exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} + assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + + exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} + assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + + ## Test parallel + # + # FAIL! in #26 + pipeline.set_execution_method("parallel") + exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21} + assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + # + # FAIL! in #26 + exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6} + assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp + assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c") + + def test_optional(): # Test that optional() needs work as expected. @@ -226,22 +463,100 @@ def addplusplus(a, b, c=0): assert 'sum2' in results +def test_deleteinstructs_vary_with_inputs(): + # Check #21: DeleteInstructions positions vary when inputs change. + def count_deletions(steps): + return sum(isinstance(n, DeleteInstruction) for n in steps) + + pipeline = compose(name="pipeline")( + operation(name="a free without b", needs=["a"], provides=["aa"])(identity), + operation(name="satisfiable", needs=["a", "b"], provides=["ab"])(add), + operation(name="optional ab", needs=["aa", modifiers.optional("ab")], provides=["asked"]) + (lambda a, ab=10: a + ab), + ) + + inp = {"a": 2, "b": 3} + exp = inp.copy(); exp.update({"aa": 2, "ab": 5, "asked": 7}) + res = pipeline(inp) + assert res == exp # ok + steps11 = pipeline.net.execution_plan + res = pipeline(inp, outputs=["asked"]) + assert res == filtdict(exp, "asked") # ok + steps12 = pipeline.net.execution_plan + + inp = {"a": 2} + exp = inp.copy(); exp.update({"aa": 2, "asked": 12}) + res = pipeline(inp) + assert res == exp # ok + steps21 = pipeline.net.execution_plan + res = pipeline(inp, outputs=["asked"]) + assert res == filtdict(exp, "asked") # ok + steps22 = pipeline.net.execution_plan + + # When no outs, no del-instructs. + assert steps11 != steps12 + assert count_deletions(steps11) == 0 + assert steps21 != steps22 + assert count_deletions(steps21) == 0 + + # Check steps vary with inputs + # + # FAILs in v1.2.4 + #18, PASS in #26 + assert steps11 != steps21 + + # Check deletes vary with inputs + # + # FAILs in v1.2.4 + #18, PASS in #26 + assert count_deletions(steps12) != count_deletions(steps22) + + +def test_multithreading_plan_execution(): + # From Huygn's test-code given in yahoo/graphkit#31 + from multiprocessing.dummy import Pool + from graphkit import compose, operation + + # Computes |a|^p. + def abspow(a, p): + c = abs(a) ** p + return c + + # Compose the mul, sub, and abspow operations into a computation graph. + graph = compose(name="graph")( + operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul), + operation(name="sub1", needs=["a", "ab"], provides=["a_minus_ab"])(sub), + operation( + name="abspow1", + needs=["a_minus_ab"], + provides=["abs_a_minus_ab_cubed"], + params={"p": 3}, + )(abspow), + ) + + pool = Pool(10) + graph.set_execution_method("parallel") + pool.map( + lambda i: graph({"a": 2, "b": 5}, ["a_minus_ab", "abs_a_minus_ab_cubed"]), + range(100), + ) + def test_parallel_execution(): import time + delay = 0.5 + def fn(x): - time.sleep(1) + time.sleep(delay) print("fn %s" % (time.time() - t0)) return 1 + x def fn2(a,b): - time.sleep(1) + time.sleep(delay) print("fn2 %s" % (time.time() - t0)) return a+b def fn3(z, k=1): - time.sleep(1) + time.sleep(delay) print("fn3 %s" % (time.time() - t0)) return z + k @@ -280,6 +595,7 @@ def fn3(z, k=1): # make sure results are the same using either method assert result_sequential == result_threaded + def test_multi_threading(): import time import random @@ -310,8 +626,8 @@ def infer(i): assert tuple(sorted(results.keys())) == tuple(sorted(outputs)), (outputs, results) return results - N = 100 - for i in range(20, 200): + N = 33 + for i in range(13, 61): pool = Pool(i) pool.map(infer, range(N)) pool.close() @@ -353,6 +669,7 @@ def compute(self, inputs): outputs.append(p) return outputs + def test_backwards_compatibility(): sum_op1 = Sum(