5. API Reference¶
graphtik |
Lightweight computation graphs for Python. |
graphtik.op |
About operation nodes (but not net-ops to break cycle). |
graphtik.modifiers |
Modifiers change the behavior of specific needs or provides. |
graphtik.netop |
About network operations (those based on graphs) |
graphtik.network |
Compile & execute network graphs of operations. |
graphtik.plot |
|
graphtik.config |
Configurations for network execution, and utilities on them. |
graphtik.base |
Generic or specific utilities |
Module: op¶
About operation nodes (but not net-ops to break cycle).
-
class
graphtik.op.
FunctionalOperation
(fn: Callable, name, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, *, parents: Tuple = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None)[source]¶ An operation performing a callable (ie a function, a method, a lambda).
Parameters: - provides – Value names this operation provides (including aliases/sideffects).
- real_provides –
Value names the underlying function provides (without aliases, with(!) sideffects).
FIXME: real_provides not sure what it does with sideffects
Tip
Use
operation()
builder class to build instances of this class instead.-
compute
(named_inputs, outputs=None) → dict[source]¶ Compute (optional) asked outputs for the given named_inputs.
It is called by
Network
. End-users should simply call the operation with named_inputs as kwargs.Parameters: named_inputs – the input values with which to feed the computation. Returns list: Should return a list values representing the results of running the feed-forward computation on inputs
.
-
class
graphtik.op.
Operation
[source]¶ An abstract class representing an action with
compute()
.-
compute
(named_inputs, outputs=None)[source]¶ Compute (optional) asked outputs for the given named_inputs.
It is called by
Network
. End-users should simply call the operation with named_inputs as kwargs.Parameters: named_inputs – the input values with which to feed the computation. Returns list: Should return a list values representing the results of running the feed-forward computation on inputs
.
-
-
graphtik.op.
as_renames
(i, argname)[source]¶ parses a list of (source–>destination) from dict, list-of-2-items, single 2-tuple.
-
class
graphtik.op.
operation
(fn: Callable = None, *, name=None, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None)[source]¶ A builder for graph-operations wrapping functions.
Parameters: - fn (function) – The function used by this operation. This does not need to be
specified when the operation object is instantiated and can instead
be set via
__call__
later. - name (str) – The name of the operation in the computation graph.
- needs –
The list of (positionally ordered) names of the data needed by the operation to receive as inputs, roughly corresponding to the arguments of the underlying fn.
- provides –
Names of output data this operation provides, which must correspond to the returned values of the fn. If more than one given, those must be returned in an iterable, unless returns_dict is true, in which case a dictionary with (at least) as many elements must be returned.
- aliases – an optional mapping of provides to additional ones
- rescheduled – If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.
- endured – If true, even if callable fails, solution will reschedule. ignored if endurance enabled globally.
- parallel – execute in parallel
- marshalled – If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).
- returns_dict – if true, it means the fn returns dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)
- node_props – added as-is into NetworkX graph
Returns: when called, it returns a
FunctionalOperation
Example:
This is an example of its use, based on the “builder pattern”:
>>> from graphtik import operation >>> opb = operation(name='add_op') >>> opb.withset(needs=['a', 'b']) operation(name='add_op', needs=['a', 'b'], provides=[], fn=None) >>> opb.withset(provides='SUM', fn=sum) operation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')
You may keep calling
withset()
till you invoke a final__call__()
on the builder; then you get the actualFunctionalOperation
instance:>>> # Create `Operation` and overwrite function at the last moment. >>> opb(sum) FunctionalOperation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')
Tip
Remember to call once more the builder class at the end, to get the actual operation instance.
-
withset
(*, fn: Callable = None, name=None, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None) → graphtik.op.operation[source]¶ See
operation
for arguments here.
- fn (function) – The function used by this operation. This does not need to be
specified when the operation object is instantiated and can instead
be set via
Module: modifiers¶
Modifiers change the behavior of specific needs or provides.
The needs and provides annotated with modifiers designate, for instance, optional function arguments, or “ghost” sideffects.
-
class
graphtik.modifiers.
arg
[source]¶ Annotate a needs to map from its name in the inputs to a different argument-name.
Parameters: fn_arg – The argument-name corresponding to this named-input.
Note
This extra mapping argument is needed either for optionals or for functions with keywords-only arguments (like
def func(*, foo, bar): ...
), since inputs` are normally fed into functions by-position, not by-name.Example:
In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of
arg
(oroptional
):>>> from graphtik import operation, compose, arg
>>> def myadd(a, *, b): ... return a + b
>>> graph = compose('mygraph', ... operation(name='myadd', ... needs=['a', arg("name-in-inputs", "b")], ... provides="sum")(myadd) ... ) >>> graph NetworkOperation('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops: +--FunctionalOperation(name='myadd', needs=['a', arg('name-in-inputs'-->'b')], provides=['sum'], fn='myadd')) >>> graph.compute({"a": 5, "name-in-inputs": 4})['sum'] 9
-
class
graphtik.modifiers.
optional
[source]¶ Annotate optionals needs corresponding to defaulted op-function arguments, …
received only if present in the inputs (when operation is invocated). The value of an optional is passed as a keyword argument to the underlying function.
Example:
>>> from graphtik import operation, compose, optional
>>> def myadd(a, b=0): ... return a + b
Annotate
b
as optional argument (and notice it’s default value0
):>>> graph = compose('mygraph', ... operation(name='myadd', ... needs=["a", optional("b")], ... provides="sum")(myadd) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('b')], provides=['sum'], x1 ops: ...
The graph works both with and without
c
provided in the inputs:>>> graph(a=5, b=4)['sum'] 9 >>> graph(a=5) {'a': 5, 'sum': 5}
Like
arg
you may map input-name to a different function-argument:>>> graph = compose('mygraph', ... operation(name='myadd', ... needs=['a', optional("quasi-real", "b")], ... provides="sum")(myadd) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('quasi-real')], provides=['sum'], x1 ops: +--FunctionalOperation(name='myadd', needs=['a', optional('quasi-real'-->'b')], provides=['sum'], fn='myadd')) >>> graph.compute({"a": 5, "quasi-real": 4})['sum'] 9
-
class
graphtik.modifiers.
sideffect
[source]¶ sideffects dependencies participates in the graph but not exchanged with functions.
Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (compilation) but they do not interact with the operation’s function; specifically:
- input sideffects must exist in the inputs for an operation to kick-in;
- input sideffects are NOT fed into the function;
- output sideffects are NOT expected from the function;
- output sideffects are stored in the solution.
Their purpose is to describe operations that modify the internal state of some of their arguments (“side-effects”).
Example:
A typical use-case is to signify columns required to produce new ones in pandas dataframes:
>>> from graphtik import operation, compose, sideffect
>>> # Function appending a new dataframe column from two pre-existing ones. >>> def addcolumns(df): ... df['sum'] = df['a'] + df['b']
Designate
a
,b
&sum
column names as an sideffect arguments:>>> graph = compose('mygraph', ... operation( ... name='addcolumns', ... needs=['df', sideffect('df.b')], # sideffect names can be anything ... provides=[sideffect('df.sum')])(addcolumns) ... ) >>> graph NetworkOperation('mygraph', needs=['df', 'sideffect(df.b)'], provides=['sideffect(df.sum)'], x1 ops: +--FunctionalOperation(name='addcolumns', needs=['df', 'sideffect(df.b)'], provides=['sideffect(df.sum)'], fn='addcolumns'))
>>> df = pd.DataFrame({'a': [5, 0], 'b': [2, 1]}) # doctest: +SKIP >>> graph({'df': df})['df'] # doctest: +SKIP a b 0 5 2 1 0 1
We didn’t get the
sum
column because theb
sideffect was unsatisfied. We have to add its key to the inputs (with any value):>>> graph({'df': df, sideffect("df.b"): 0})['df'] # doctest: +SKIP a b sum 0 5 2 7 1 0 1 1
Note that regular data in needs and provides do not match same-named sideffects. That is, in the following operation, the
prices
input is different from thesideffect(prices)
output:>>> def upd_prices(sales_df, prices): ... sales_df["Prices"] = prices
>>> operation(fn=upd_prices, ... name="upd_prices", ... needs=["sales_df", "price"], ... provides=[sideffect("price")]) operation(name='upd_prices', needs=['sales_df', 'price'], provides=['sideffect(price)'], fn='upd_prices')
Note
An operation with sideffects outputs only, have functions that return no value at all (like the one above). Such operation would still be called for their side-effects, if requested in outputs.
Tip
You may associate sideffects with other data to convey their relationships, simply by including their names in the string - in the end, it’s just a string - but no enforcement will happen from graphtik, like:
>>> sideffect("price[sales_df]") 'sideffect(price[sales_df])'
-
class
graphtik.modifiers.
vararg
[source]¶ Annotate optionals needs to be fed as op-function’s
*args
when present in inputs.See also
Consult also the example test-case in:
test/test_op.py:test_varargs()
, in the full sources of the project.Example:
>>> from graphtik import operation, compose, vararg
>>> def addall(a, *b): ... return a + sum(b)
Designate
b
&c
as an vararg arguments:>>> graph = compose( ... 'mygraph', ... operation( ... name='addall', ... needs=['a', vararg('b'), vararg('c')], ... provides='sum' ... )(addall) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('b'), optional('c')], provides=['sum'], x1 ops: +--FunctionalOperation(name='addall', needs=['a', vararg('b'), vararg('c')], provides=['sum'], fn='addall'))
The graph works with and without any of
b
orc
inputs:>>> graph(a=5, b=2, c=4)['sum'] 11 >>> graph(a=5, b=2) {'a': 5, 'b': 2, 'sum': 7} >>> graph(a=5) {'a': 5, 'sum': 5}
-
class
graphtik.modifiers.
varargs
[source]¶ Like
vararg
, naming an optional iterable value in the inputs.See also
Consult also the example test-case in:
test/test_op.py:test_varargs()
, in the full sources of the project.Example:
>>> from graphtik import operation, compose, vararg
>>> def enlist(a, *b): ... return [a] + list(b)
>>> graph = compose('mygraph', ... operation(name='enlist', needs=['a', varargs('b')], ... provides='sum')(enlist) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('b')], provides=['sum'], x1 ops: +--FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist'))
The graph works with or without b in the inputs:
>>> graph(a=5, b=[2, 20])['sum'] [5, 2, 20] >>> graph(a=5) {'a': 5, 'sum': [5]} >>> graph(a=5, b=0xBAD) Traceback (most recent call last): ... graphtik.base.MultiValueError: Failed preparing needs: 1. Expected needs[varargs('b')] to be non-str iterables! +++inputs: {'a': 5, 'b': 2989} +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')
Attention
To avoid user mistakes, it does not accept strings (though iterables):
>>> graph(a=5, b="mistake") Traceback (most recent call last): ... graphtik.base.MultiValueError: Failed preparing needs: 1. Expected needs[varargs('b')] to be non-str iterables! +++inputs: {'a': 5, 'b': 'mistake'} +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')
Module: netop¶
About network operations (those based on graphs)
-
class
graphtik.netop.
NetworkOperation
(operations, name, *, outputs=None, predicate: Callable[[Any, Mapping[KT, VT_co]], bool] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, merge=None, node_props=None)[source]¶ An operation that can compute a network-graph of operations.
Tip
Use
compose()
factory to prepare the net and build instances of this class.-
compile
(inputs=None, outputs=<UNSET>, predicate: Callable[[Any, Mapping[KT, VT_co]], bool] = <UNSET>) → graphtik.network.ExecutionPlan[source]¶ Produce a plan for the given args or outputs/predicate narrowed earlier.
Parameters: - named_inputs – a string or a list of strings that should be fed to the needs of all operations.
- outputs – A string or a list of strings with all data asked to compute.
If
None
, all possible intermediate outputs will be kept. If not given, those set by a previous call towithset()
or cstor are used. - predicate – Will be stored and applied on the next
compute()
orcompile()
. If not given, those set by a previous call towithset()
or cstor are used.
Returns: the execution plan satisfying the given inputs, outputs & predicate
Raises: If outputs asked do not exist in network, with msg:
Unknown output nodes: …
If solution does not contain any operations, with msg:
Unsolvable graph: …
If given inputs mismatched plan’s
needs
, with msg:Plan needs more inputs…
If outputs asked cannot be produced by the
dag
, with msg:Impossible outputs…
-
compute
(named_inputs: Mapping[KT, VT_co], outputs: Union[Collection[T_co], str, None] = <UNSET>, predicate: Callable[[Any, Mapping[KT, VT_co]], bool] = <UNSET>) → graphtik.network.Solution[source]¶ Compile a plan & execute the graph, sequentially or parallel.
Attention
If intermediate compilation is successful, the “global abort run flag is reset before the execution starts.
Parameters: - named_inputs – A maping of names –> values that will be fed to the needs of all operations. Cloned, not modified.
- outputs – A string or a list of strings with all data asked to compute.
If
None
, all intermediate data will be kept.
Returns: The solution which contains the results of each operation executed +1 for inputs in separate dictionaries.
Raises: If outputs asked do not exist in network, with msg:
Unknown output nodes: …
If plan does not contain any operations, with msg:
Unsolvable graph: …
If given inputs mismatched plan’s
needs
, with msg:Plan needs more inputs…
If outputs asked cannot be produced by the
dag
, with msg:Impossible outputs…
See also
Operation.compute()
.
-
last_plan
= None[source]¶ The execution_plan of the last call to compute(), stored as debugging aid.
-
predicate
= None[source]¶ The node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.
-
withset
(outputs: Union[Collection[T_co], str, None] = <UNSET>, predicate: Callable[[Any, Mapping[KT, VT_co]], bool] = <UNSET>, *, name=None, rescheduled=None, endured=None, parallel=None, marshalled=None) → graphtik.netop.NetworkOperation[source]¶ Return a copy with a network pruned for the given needs & provides.
Parameters: - outputs – Will be stored and applied on the next
compute()
orcompile()
. If not given, the value of this instance is conveyed to the clone. - predicate – Will be stored and applied on the next
compute()
orcompile()
. If not given, the value of this instance is conveyed to the clone. - name –
the name for the new netop:
- if None, the same name is kept;
- if True, a distinct name is devised:
<old-name>-<uid>
- otherwise, the given name is applied.
- rescheduled – applies rescheduled to all contained operations
- endured – applies endurance to all contained operations
- parallel – mark all contained operations to be executed in parallel
- marshalled – mark all contained operations to be marshalled (usefull when run in parallel with a process pool).
Returns: A narrowed netop clone, which MIGHT be empty!*
Raises: If outputs asked do not exist in network, with msg:
Unknown output nodes: …
- outputs – Will be stored and applied on the next
-
-
graphtik.netop.
compose
(name, op1, *operations, outputs: Union[Collection[T_co], str, None] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, merge=False, node_props=None) → graphtik.netop.NetworkOperation[source]¶ Composes a collection of operations into a single computation graph, obeying the
merge
property, if set in the constructor.Parameters: - name (str) – A optional name for the graph being composed by this object.
- op1 – syntactically force at least 1 operation
- operations – Each argument should be an operation instance created using
operation
. - merge (bool) – If
True
, this compose object will attempt to merge togetheroperation
instances that represent entire computation graphs. Specifically, if one of theoperation
instances passed to thiscompose
object is itself a graph operation created by an earlier use ofcompose
the sub-operations in that graph are compared against other operations passed to thiscompose
instance (as well as the sub-operations of other graphs passed to thiscompose
instance). If any two operations are the same (based on name), then that operation is computed only once, instead of multiple times (one for each time the operation appears). - rescheduled – applies rescheduled to all contained operations
- endured – applies endurance to all contained operations
- parallel – mark all contained operations to be executed in parallel
- marshalled – mark all contained operations to be marshalled (usefull when run in parallel with a process pool).
- node_props – added as-is into NetworkX graph, to provide for filtering
by
NetworkOperation.withset()
.
Returns: Returns a special type of operation class, which represents an entire computation graph as a single operation.
Raises: ValueError – If the net` cannot produce the asked outputs from the given inputs.
Module: network¶
Compile & execute network graphs of operations.
-
exception
graphtik.network.
AbortedException
[source]¶ Raised from Network when
abort_run()
is called, and contains the solution …with any values populated so far.
-
class
graphtik.network.
ExecutionPlan
[source]¶ A pre-compiled list of operation steps that can execute for the given inputs/outputs.
It is the result of the network’s compilation phase.
Note the execution plan’s attributes are on purpose immutable tuples.
-
steps
[source]¶ The tuple of operation-nodes & instructions needed to evaluate the given inputs & asked outputs, free memory and avoid overwritting any given intermediate inputs.
-
asked_outs
[source]¶ When true, evictions may kick in (unless disabled by configurations), otherwise, evictions (along with prefect-evictions check) are skipped.
-
__dict__
= mappingproxy({'__module__': 'graphtik.network', '__doc__': "\n A pre-compiled list of operation steps that can :term:`execute` for the given inputs/outputs.\n\n It is the result of the network's :term:`compilation` phase.\n\n Note the execution plan's attributes are on purpose immutable tuples.\n\n .. attribute:: net\n\n The parent :class:`Network`\n .. attribute:: needs\n\n An :class:`iset` with the input names needed to exist in order to produce all `provides`.\n .. attribute:: provides\n\n An :class:`iset` with the outputs names produces when all `inputs` are given.\n .. attribute:: dag\n\n The regular (not broken) *pruned* subgraph of net-graph.\n .. attribute:: steps\n\n The tuple of operation-nodes & *instructions* needed to evaluate\n the given inputs & asked outputs, free memory and avoid overwritting\n any given intermediate inputs.\n .. attribute:: asked_outs\n\n When true, :term:`evictions` may kick in (unless disabled by :term:`configurations`),\n otherwise, *evictions* (along with prefect-evictions check) are skipped.\n ", '_build_pydot': <function ExecutionPlan._build_pydot>, '__repr__': <function ExecutionPlan.__repr__>, 'validate': <function ExecutionPlan.validate>, '_check_if_aborted': <function ExecutionPlan._check_if_aborted>, '_prepare_tasks': <function ExecutionPlan._prepare_tasks>, '_handle_task': <function ExecutionPlan._handle_task>, '_execute_thread_pool_barrier_method': <function ExecutionPlan._execute_thread_pool_barrier_method>, '_execute_sequential_method': <function ExecutionPlan._execute_sequential_method>, 'execute': <function ExecutionPlan.execute>, '__dict__': <attribute '__dict__' of 'ExecutionPlan' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc_data object>})[source]¶
-
_execute_sequential_method
(solution: graphtik.network.Solution)[source]¶ This method runs the graph one operation at a time in a single thread
Parameters: solution – must contain the input values only, gets modified
-
_execute_thread_pool_barrier_method
(solution: graphtik.network.Solution)[source]¶ This method runs the graph using a parallel pool of thread executors. You may achieve lower total latency if your graph is sufficiently sub divided into operations using this method.
Parameters: solution – must contain the input values only, gets modified
-
_handle_task
(future, op, solution) → None[source]¶ Un-dill parallel task results (if marshalled), and update solution / handle failure.
-
_prepare_tasks
(operations, solution, pool, global_parallel, global_marshal) → Union[Future, graphtik.network._OpTask, bytes][source]¶ Combine ops+inputs, apply marshalling, and submit to execution pool (or not) …
based on global/pre-op configs.
-
execute
(named_inputs, outputs=None, *, name='') → graphtik.network.Solution[source]¶ Parameters: - named_inputs – A maping of names –> values that must contain at least the compulsory inputs that were specified when the plan was built (but cannot enforce that!). Cloned, not modified.
- outputs – If not None, they are just checked if possible, based on
provides
, and scream if not.
Returns: The solution which contains the results of each operation executed +1 for inputs in separate dictionaries.
Raises:
-
validate
(inputs: Union[Collection[T_co], str, None], outputs: Union[Collection[T_co], str, None])[source]¶ Scream on invalid inputs, outputs or no operations in graph.
Raises: ValueError –
-
-
exception
graphtik.network.
IncompleteExecutionError
[source]¶ Raised by
scream_if_incomplete()
when netop operations were canceled/failed.The exception contains 3 arguments:
- the causal errors and conditions (1st arg),
- the list of collected exceptions (2nd arg), and
- the solution instance (3rd argument), to interrogate for more.
-
class
graphtik.network.
Network
(*operations, graph=None)[source]¶ A graph of operations that can compile an execution plan.
-
__init__
(*operations, graph=None)[source]¶ Parameters: - operations – to be added in the graph
- graph – if None, create a new.
Raises: if dupe operation, with msg:
Operations may only be added once, …
-
_append_operation
(graph, operation: graphtik.op.Operation)[source]¶ Adds the given operation and its data requirements to the network graph.
- Invoked during constructor only (immutability).
- Identities are based on the name of the operation, the names of the operation’s needs, and the names of the data it provides.
- Adds needs, operation & provides, in that order.
Parameters: - graph – the networkx graph to append to
- operation – operation instance to append
-
_build_execution_steps
(pruned_dag, inputs: Collection[T_co], outputs: Optional[Collection[T_co]]) → List[T][source]¶ Create the list of operation-nodes & instructions evaluating all
operations & instructions needed a) to free memory and b) avoid overwritting given intermediate inputs.
Parameters: - pruned_dag – The original dag, pruned; not broken.
- outputs – outp-names to decide whether to add (and which) evict-instructions
Instances of
_EvictInstructions
are inserted in steps between operation nodes to reduce the memory footprint of solutions while the computation is running. An evict-instruction is inserted whenever a need is not used by any other operation further down the DAG.
-
_cached_plans
= None[source]¶ Speed up
compile()
call and avoid a multithreading issue(?) that is occuring when accessing the dag in networkx.
-
_prune_graph
(inputs: Union[Collection[T_co], str, None], outputs: Union[Collection[T_co], str, None], predicate: Callable[[Any, Mapping[KT, VT_co]], bool] = None) → Tuple[<sphinx.ext.autodoc.importer._MockObject object at 0x7f5721351d68>, Collection[T_co], Collection[T_co], Collection[T_co]][source]¶ Determines what graph steps need to run to get to the requested outputs from the provided inputs: - Eliminate steps that are not on a path arriving to requested outputs; - Eliminate unsatisfied operations: partial inputs or no outputs needed; - consolidate the list of needs & provides.
Parameters: - inputs – The names of all given inputs.
- outputs – The desired output names. This can also be
None
, in which case the necessary steps are all graph nodes that are reachable from the provided inputs. - predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.
Returns: a 3-tuple with the pruned_dag & the needs/provides resolved based on the given inputs/outputs (which might be a subset of all needs/outputs of the returned graph).
Use the returned needs/provides to build a new plan.
Raises: if outputs asked do not exist in network, with msg:
Unknown output nodes: …
-
_topo_sort_nodes
(dag) → List[T][source]¶ Topo-sort dag respecting operation-insertion order to break ties.
-
compile
(inputs: Union[Collection[T_co], str, None] = None, outputs: Union[Collection[T_co], str, None] = None, predicate=None) → graphtik.network.ExecutionPlan[source]¶ Create or get from cache an execution-plan for the given inputs/outputs.
See
_prune_graph()
and_build_execution_steps()
for detailed description.Parameters: - inputs – A collection with the names of all the given inputs. If None`, all inputs that lead to given outputs are assumed. If string, it is converted to a single-element collection.
- outputs – A collection or the name of the output name(s). If None`, all reachable nodes from the given inputs are assumed. If string, it is converted to a single-element collection.
- predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.
Returns: the cached or fresh new execution plan
Raises: If outputs asked do not exist in network, with msg:
Unknown output nodes: …
If solution does not contain any operations, with msg:
Unsolvable graph: …
If given inputs mismatched plan’s
needs
, with msg:Plan needs more inputs…
If outputs asked cannot be produced by the
dag
, with msg:Impossible outputs…
-
-
class
graphtik.network.
Solution
(plan, input_values)[source]¶ Collects outputs from operations, preserving overwrites.
-
executed
[source]¶ A dictionary with keys the operations executed, and values their status:
- no key: not executed yet
- value None: execution ok
- value Exception: execution failed
-
canceled
[source]¶ A sorted set of canceled operations due to upstream failures.
-
finalized
[source]¶ a flag denoting that this instance cannot accept more results (after the
finalized()
has been invoked)
-
__init__
(plan, input_values)[source]¶ Initialize a ChainMap by setting maps to the given mappings. If no mappings are provided, a single empty dictionary is used.
-
operation_executed
(op, outputs)[source]¶ Invoked once per operation, with its results.
It will update
executed
with the operation status and if outputs were partials, it will updatecanceled
with the unsatisfied ops downstream of op.Parameters: - op – the operation that completed ok
- outputs – The names of the outputs values the op` actually produced, which may be a subset of its provides. Sideffects are not considered.
-
operation_failed
(op, ex)[source]¶ Invoked once per operation, with its results.
It will update
executed
with the operation status and thecanceled
with the unsatisfied ops downstream of op.
-
overwrites
[source]¶ The data in the solution that exist more than once.
A “virtual” property to a dictionary with keys the names of values that exist more than once, and values, all those values in a list, ordered:
- before
finished()
, as computed; - after
finished()
, in reverse.
- before
-
scream_if_incomplete
()[source]¶ Raise a
IncompleteExecutionError
when netop operations failed/canceled.
-
-
class
graphtik.network.
_DataNode
[source]¶ Dag node naming a data-value produced or required by an operation.
-
class
graphtik.network.
_EvictInstruction
[source]¶ A step in the ExecutionPlan to evict a computed value from the solution.
It’s a step in
ExecutionPlan.steps
for the data-node str that frees its data-value from solution after it is no longer needed, to reduce memory footprint while computing the graph.
-
class
graphtik.network.
_OpTask
(op, sol, solid)[source]¶ Mimic
concurrent.futures.Future
for sequential execution.This intermediate class is needed to solve pickling issue with process executor.
-
graphtik.network.
_do_task
(task)[source]¶ Un-dill the simpler
_OpTask
& Dill the results, to pass through pool-processes.
-
graphtik.network.
_optionalized
(graph, data)[source]¶ Retain optionality of a data node based on all needs edges.
-
graphtik.network.
_unsatisfied_operations
(dag, inputs: Collection[T_co]) → List[T][source]¶ Traverse topologically sorted dag to collect un-satisfied operations.
Unsatisfied operations are those suffering from ANY of the following:
- They are missing at least one compulsory need-input.
- Since the dag is ordered, as soon as we’re on an operation, all its needs have been accounted, so we can get its satisfaction.
- Their provided outputs are not linked to any data in the dag.
- An operation might not have any output link when
_prune_graph()
has broken them, due to given intermediate inputs.
Parameters: - dag – a graph with broken edges those arriving to existing inputs
- inputs – an iterable of the names of the input values
Returns: a list of unsatisfied operations to prune
Module: plot¶
Plotting of graphtik graphs.
-
graphtik.plot.
build_pydot
(graph, steps=None, inputs=None, outputs=None, solution=None, title=None, node_props=None, edge_props=None, clusters=None, legend_url='https://graphtik.readthedocs.io/en/latest/_images/GraphtikLegend.svg') → <sphinx.ext.autodoc.importer._MockObject object at 0x7f5721978fd0>[source]¶ Build a Graphviz out of a Network graph/steps/inputs/outputs and return it.
See
Plotter.plot()
for the arguments, sample code, and the legend of the plots.
-
graphtik.plot.
default_jupyter_render
= {'svg_container_styles': '', 'svg_element_styles': 'width: 100%; height: 300px;', 'svg_pan_zoom_json': '{controlIconsEnabled: true, zoomScaleSensitivity: 0.4, fit: true}'}[source]¶ A nested dictionary controlling the rendering of graph-plots in Jupyter cells,
as those returned from
Plotter.plot()
(currently as SVGs). Either modify it in place, or pass another one in the respective methods.The following keys are supported.
Parameters: - svg_pan_zoom_json –
arguments controlling the rendering of a zoomable SVG in Jupyter notebooks, as defined in https://github.com/ariutta/svg-pan-zoom#how-to-use if None, defaults to string (also maps supported):
"{controlIconsEnabled: true, zoomScaleSensitivity: 0.4, fit: true}"
- svg_element_styles –
mostly for sizing the zoomable SVG in Jupyter notebooks. Inspect & experiment on the html page of the notebook with browser tools. if None, defaults to string (also maps supported):
"width: 100%; height: 300px;"
- svg_container_styles – like svg_element_styles, if None, defaults to empty string (also maps supported).
- svg_pan_zoom_json –
-
graphtik.plot.
legend
(filename=None, show=None, jupyter_render: Mapping[KT, VT_co] = None, arch_url='https://graphtik.readthedocs.io/en/latest/arch.html')[source]¶ Generate a legend for all plots (see
Plotter.plot()
for args)Parameters: arch_url – the url to the architecture section explaining graphtik glossary. See
render_pydot()
for the rest arguments.
-
graphtik.plot.
render_pydot
(dot: <sphinx.ext.autodoc.importer._MockObject object at 0x7f5722039cc0>, filename=None, show=False, jupyter_render: str = None)[source]¶ Plot a Graphviz dot in a matplotlib, in file or return it for Jupyter.
Parameters: - dot – the pre-built Graphviz
pydot.Dot
instance - filename (str) – Write diagram into a file.
Common extensions are
.png .dot .jpg .jpeg .pdf .svg
callplot.supported_plot_formats()
for more. - show – If it evaluates to true, opens the diagram in a matplotlib window. If it equals -1, it returns the image but does not open the Window.
- jupyter_render –
a nested dictionary controlling the rendering of graph-plots in Jupyter cells. If None, defaults to
default_jupyter_render
(you may modify those in place and they will apply for all future calls).You may increase the height of the SVG cell output with something like this:
netop.plot(jupyter_render={"svg_element_styles": "height: 600px; width: 100%"})
Returns: the matplotlib image if
show=-1
, or the dot.See
Plotter.plot()
for sample code.- dot – the pre-built Graphviz
Module: config¶
Configurations for network execution, and utilities on them.
-
graphtik.config.
abort_run
()[source]¶ Sets the abort run global flag, to halt all currently or future executing plans.
This global flag is reset when any
NetworkOperation.compute()
is executed, or manually, by callingreset_abort()
.
-
graphtik.config.
evictions_skipped
(enabled)[source]¶ Like
set_skip_evictions()
as a context-manager to reset old value.
-
graphtik.config.
get_execution_pool
() → Optional[Pool][source]¶ Get the process-pool for parallel plan executions.
-
graphtik.config.
operations_endured
(enabled)[source]¶ Like
set_endure_operations()
as a context-manager to reset old value.
-
graphtik.config.
operations_reschedullled
(enabled)[source]¶ Like
set_reschedule_operations()
as a context-manager to reset old value.
-
graphtik.config.
reset_abort
()[source]¶ Reset the abort run global flag, to permit plan executions to proceed.
-
graphtik.config.
set_endure_operations
(enabled)[source]¶ Enable/disable globally endurance to keep executing even if some operations fail.
Parameters: enable – - If
None
(default), respect the flag on each operation; - If true/false, force it for all operations.
Returns: a “reset” token (see ContextVar.set()
).
- If
-
graphtik.config.
set_execution_pool
(pool: Optional[Pool])[source]¶ Set the process-pool for parallel plan executions.
You may have to :also func:set_marshal_tasks() to resolve pickling issues.
-
graphtik.config.
set_marshal_tasks
(enabled)[source]¶ Enable/disable globally marshalling of parallel operations, …
inputs & outputs with
dill
, which might help for pickling problems.Parameters: enable – - If
None
(default), respect the respective flag on each operation; - If true/false, force it for all operations.
Returns: a “reset” token (see ContextVar.set()
)- If
-
graphtik.config.
set_parallel_tasks
(enabled)[source]¶ Enable/disable globally parallel execution of operations.
Parameters: enable – - If
None
(default), respect the respective flag on each operation; - If true/false, force it for all operations.
Returns: a “reset” token (see ContextVar.set()
)- If
-
graphtik.config.
set_reschedule_operations
(enabled)[source]¶ Enable/disable globally rescheduling for operations returning only partial outputs.
Parameters: enable – - If
None
(default), respect the flag on each operation; - If true/false, force it for all operations.
Returns: a “reset” token (see ContextVar.set()
).
- If
-
graphtik.config.
set_skip_evictions
(enabled)[source]¶ When true, disable globally evictions, to keep all intermediate solution values, …
regardless of asked outputs.
Returns: a “reset” token (see ContextVar.set()
)
-
graphtik.config.
tasks_in_parallel
(enabled)[source]¶ Like
set_parallel_tasks()
as a context-manager to reset old value.
-
graphtik.config.
tasks_marshalled
(enabled)[source]¶ Like
set_marshal_tasks()
as a context-manager to reset old value.
Module: base¶
Generic or specific utilities
-
graphtik.base.
NO_RESULT
= <NO_RESULT>[source]¶ When an operation function returns this special value, it implies operation has no result at all, (otherwise, it would have been a single result,
None
).`
-
class
graphtik.base.
Plotter
[source]¶ Classes wishing to plot their graphs should inherit this and …
implement property
plot
to return a “partial” callable that somehow ends up callingplot.render_pydot()
with the graph or any other args bound appropriately. The purpose is to avoid copying this function & documentation here around.-
plot
(filename=None, show=False, jupyter_render: Union[None, Mapping[KT, VT_co], str] = None, **kws)[source]¶ Entry-point for plotting ready made operation graphs.
Parameters: - filename (str) – Write diagram into a file.
Common extensions are
.png .dot .jpg .jpeg .pdf .svg
callplot.supported_plot_formats()
for more. - show – If it evaluates to true, opens the diagram in a matplotlib window. If it equals -1, it plots but does not open the Window.
- inputs – an optional name list, any nodes in there are plotted as a “house”
- outputs – an optional name list, any nodes in there are plotted as an “inverted-house”
- solution – an optional dict with values to annotate nodes, drawn “filled”
(currently content not shown, but node drawn as “filled”).
It extracts more infos from a
Solution
instance, such as, if solution has anexecuted
attribute, operations contained in it are drawn as “filled”. - title – an optional string to display at the bottom of the graph
- node_props – an optional nested dict of Graphviz attributes for certain nodes
- edge_props – an optional nested dict of Graphviz attributes for certain edges
- clusters – an optional mapping of nodes –> cluster-names, to group them
- jupyter_render – a nested dictionary controlling the rendering of graph-plots in Jupyter cells,
if None, defaults to
jupyter_render
(you may modify it in place and apply for all future calls). - legend_url – a URL to the graphtik legend; if it evaluates to false, none is added.
Returns: a pydot.Dot instance (for for API reference visit: https://pydotplus.readthedocs.io/reference.html#pydotplus.graphviz.Dot)
Tip
The
pydot.Dot
instance returned is rendered directly in Jupyter/IPython notebooks as SVG images.You may increase the height of the SVG cell output with something like this:
netop.plot(jupyter_render={"svg_element_styles": "height: 600px; width: 100%"})
Check
default_jupyter_render
for defaults.Note that the graph argument is absent - Each Plotter provides its own graph internally; use directly
render_pydot()
to provide a different graph.NODES:
- oval
- function
- egg
- subgraph operation
- house
- given input
- inversed-house
- asked output
- polygon
- given both as input & asked as output (what?)
- square
- intermediate data, neither given nor asked.
- red frame
- evict-instruction, to free up memory.
- filled
- data node has a value in solution OR function has been executed.
- thick frame
- function/data node in execution steps.
ARROWS
- solid black arrows
- dependencies (source-data need-ed by target-operations, sources-operations provides target-data)
- dashed black arrows
- optional needs
- blue arrows
- sideffect needs/provides
- wheat arrows
- broken dependency (
provide
) during pruning - green-dotted arrows
- execution steps labeled in succession
To generate the legend, see
legend()
.Sample code:
>>> from graphtik import compose, operation >>> from graphtik.modifiers import optional >>> from operator import add
>>> netop = compose("netop", ... operation(name="add", needs=["a", "b1"], provides=["ab1"])(add), ... operation(name="sub", needs=["a", optional("b2")], provides=["ab2"])(lambda a, b=1: a-b), ... operation(name="abb", needs=["ab1", "ab2"], provides=["asked"])(add), ... )
>>> netop.plot(show=True); # plot just the graph in a matplotlib window # doctest: +SKIP >>> inputs = {'a': 1, 'b1': 2} >>> solution = netop(**inputs) # now plots will include the execution-plan
>>> netop.plot('plot1.svg', inputs=inputs, outputs=['asked', 'b1'], solution=solution); # doctest: +SKIP >>> dot = netop.plot(solution=solution); # just get the `pydot.Dot` object, renderable in Jupyter >>> print(dot) digraph G { URL="https://graphtik.readthedocs.io/en/latest/_images/GraphtikLegend.svg"; fontname=italic; label=netop; a [fillcolor=wheat, shape=invhouse, style=filled, tooltip=1]; ...
- filename (str) – Write diagram into a file.
Common extensions are
-
-
graphtik.base.
aslist
(i, argname, allowed_types=<class 'list'>)[source]¶ Utility to accept singular strings as lists, and None –> [].
-
graphtik.base.
jetsam
(ex, locs, *salvage_vars, annotation='jetsam', **salvage_mappings)[source]¶ Annotate exception with salvaged values from locals() and raise!
Parameters: - ex – the exception to annotate
- locs –
locals()
from the context-manager’s block containing vars to be salvaged in case of exceptionATTENTION: wrapped function must finally call
locals()
, because locals dictionary only reflects local-var changes after call. - annotation – the name of the attribute to attach on the exception
- salvage_vars – local variable names to save as is in the salvaged annotations dictionary.
- salvage_mappings – a mapping of destination-annotation-keys –> source-locals-keys;
if a source is callable, the value to salvage is retrieved
by calling
value(locs)
. They take precendance over`salvage_vars`.
Raises: any exception raised by the wrapped function, annotated with values assigned as attributes on this context-manager
- Any attributes attached on this manager are attached as a new dict on
the raised exception as new
jetsam
attribute with a dict as value. - If the exception is already annotated, any new items are inserted, but existing ones are preserved.
Example:
Call it with managed-block’s
locals()
and tell which of them to salvage in case of errors:try: a = 1 b = 2 raise Exception() exception Exception as ex: jetsam(ex, locals(), "a", b="salvaged_b", c_var="c") raise
And then from a REPL:
import sys sys.last_value.jetsam {'a': 1, 'salvaged_b': 2, "c_var": None}
** Reason:**
Graphs may become arbitrary deep. Debugging such graphs is notoriously hard.
The purpose is not to require a debugger-session to inspect the root-causes (without precluding one).
Naively salvaging values with a simple try/except block around each function, blocks the debugger from landing on the real cause of the error - it would land on that block; and that could be many nested levels above it.