5. API Reference

graphtik

Lightweight computation graphs for Python.

graphtik.op

About operation nodes (but not net-ops to break cycle).

graphtik.modifiers

modifiers change the behavior of specific needs or provides.

graphtik.netop

About network operations (those based on graphs)

graphtik.network

compile & execute network graphs of operations.

graphtik.plot

Plotting of graph graphs handled by active plotter.

graphtik.config

configurations for network execution, and utilities on them.

graphtik.base

Generic or specific utilities without polluting imports.

graphtik.sphinxext

Extends Sphinx with graphtik directive rendering plots from doctest code.

Module: op

About operation nodes (but not net-ops to break cycle).

class graphtik.op.FunctionalOperation(fn: Callable, name, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, *, parents: Tuple = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

An operation performing a callable (ie a function, a method, a lambda).

Tip

Use operation() builder class to build instances of this class instead.

compute(named_inputs, outputs=None)dict[source]

Compute (optional) asked outputs for the given named_inputs.

It is called by Network. End-users should simply call the operation with named_inputs as kwargs.

Parameters

named_inputs – the input values with which to feed the computation.

Returns list

Should return a list values representing the results of running the feed-forward computation on inputs.

withset(**kw)graphtik.op.FunctionalOperation[source]

Make a clone with the some values replaced.

Attention

Using namedtuple._replace() would not pass through cstor, so would not get a nested name with parents, not arguments validation.

class graphtik.op.Operation[source]

An abstract class representing an action with compute().

abstract compute(named_inputs, outputs=None)[source]

Compute (optional) asked outputs for the given named_inputs.

It is called by Network. End-users should simply call the operation with named_inputs as kwargs.

Parameters

named_inputs – the input values with which to feed the computation.

Returns list

Should return a list values representing the results of running the feed-forward computation on inputs.

graphtik.op.as_renames(i, argname)[source]

parses a list of (source–>destination) from dict, list-of-2-items, single 2-tuple.

class graphtik.op.operation(fn: Callable = None, *, name=None, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

A builder for graph-operations wrapping functions.

Parameters
  • fn – The callable underlying this operation. This does not need to be specified when the operation object is instantiated and can instead be set via __call__ later.

  • name (str) – The name of the operation in the computation graph.

  • needs

    The list of (positionally ordered) names of the data needed by the operation to receive as inputs, roughly corresponding to the arguments of the underlying fn.

    See also needs & modifiers.

  • provides

    Names of output data this operation provides, which must correspond to the returned values of the fn. If more than one given, those must be returned in an iterable, unless returns_dict is true, in which case a dictionary with (at least) as many elements must be returned.

    See also provides & modifiers.

  • aliases – an optional mapping of provides to additional ones

  • rescheduled – If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.

  • endured – If true, even if callable fails, solution will reschedule. ignored if endurance enabled globally.

  • parallel – execute in parallel

  • marshalled – If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).

  • returns_dict – if true, it means the fn returns dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)

  • node_props – Added as-is into NetworkX graph, and you may filter operations by NetworkOperation.withset(). Also plot-rendering affected if they match Graphviz properties., unless they start with underscore(_)

Returns

when called, it returns a FunctionalOperation

Example:

This is an example of its use, based on the “builder pattern”:

>>> from graphtik import operation
>>> opb = operation(name='add_op')
>>> opb.withset(needs=['a', 'b'])
operation(name='add_op', needs=['a', 'b'], provides=[], fn=None)
>>> opb.withset(provides='SUM', fn=sum)
operation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')

You may keep calling withset() till you invoke a final __call__() on the builder; then you get the actual FunctionalOperation instance:

>>> # Create `Operation` and overwrite function at the last moment.
>>> opb(sum)
FunctionalOperation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')

Tip

Remember to call once more the builder class at the end, to get the actual operation instance.

withset(*, fn: Callable = None, name=None, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)graphtik.op.operation[source]

See operation for arguments here.

graphtik.op.reparse_operation_data(name, needs, provides)[source]

Validate & reparse operation data as lists.

As a separate function to be reused by client code when building operations and detect errors early.

Module: modifiers

modifiers change the behavior of specific needs or provides.

The needs and provides annotated with modifiers designate, for instance, optional function arguments, or “ghost” sideffects.

class graphtik.modifiers.arg[source]

Annotate a needs to map from its name in the inputs to a different argument-name.

Parameters

fn_arg

The argument-name corresponding to this named-input.

Note

This extra mapping argument is needed either for optionals or for functions with keywords-only arguments (like def func(*, foo, bar): ...), since inputs` are normally fed into functions by-position, not by-name.

Example:

In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of arg (or optional):

>>> from graphtik import operation, compose, arg, debug
>>> def myadd(a, *, b):
...    return a + b
>>> graph = compose('mygraph',
...     operation(name='myadd',
...               needs=['a', arg("name-in-inputs", "b")],
...               provides="sum")(myadd)
... )
>>> with debug(True):
...     graph
NetworkOperation('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops:
  +--FunctionalOperation(name='myadd',
                         needs=['a',
                         arg('name-in-inputs'-->'b')],
                         provides=['sum'],
                         fn='myadd'))
>>> graph.compute({"a": 5, "name-in-inputs": 4})['sum']
9
class graphtik.modifiers.optional[source]

Annotate optionals needs corresponding to defaulted op-function arguments, …

received only if present in the inputs (when operation is invoked). The value of an optional is passed as a keyword argument to the underlying function.

Example:

>>> from graphtik import operation, compose, optional
>>> def myadd(a, b=0):
...    return a + b

Annotate b as optional argument (and notice it’s default value 0):

>>> graph = compose('mygraph',
...     operation(name='myadd',
...               needs=["a", optional("b")],
...               provides="sum")(myadd)
... )
>>> graph
NetworkOperation('mygraph',
                 needs=['a', optional('b')],
                 provides=['sum'],
                 x1 ops: myadd)

The graph works both with and without c provided in the inputs:

>>> graph(a=5, b=4)['sum']
9
>>> graph(a=5)
{'a': 5, 'sum': 5}

Like arg you may map input-name to a different function-argument:

>>> from graphtik import debug
>>> graph = compose('mygraph',
...     operation(name='myadd',
...               needs=['a', optional("quasi-real", "b")],
...               provides="sum")(myadd)
... )
>>> with debug(True):
...     graph
NetworkOperation('mygraph', needs=['a', optional('quasi-real')], provides=['sum'], x1 ops:
  +--FunctionalOperation(name='myadd', needs=['a', optional('quasi-real'-->'b')], provides=['sum'], fn='myadd'))
>>> graph.compute({"a": 5, "quasi-real": 4})['sum']
9
class graphtik.modifiers.sideffect[source]

sideffects dependencies participates in the graph but not exchanged with functions.

Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (compilation) but they do not interact with the operation’s function; specifically:

  • input sideffects must exist in the inputs for an operation to kick-in;

  • input sideffects are NOT fed into the function;

  • output sideffects are NOT expected from the function;

  • output sideffects are stored in the solution.

Their purpose is to describe operations that modify the internal state of some of their arguments (“side-effects”).

Example:

A typical use-case is to signify columns required to produce new ones in pandas dataframes:

>>> from graphtik import operation, compose, sideffect
>>> # Function appending a new dataframe column from two pre-existing ones.
>>> def addcolumns(df):
...    df['sum'] = df['a'] + df['b']

Designate a, b & sum column names as an sideffect arguments:

>>> graph = compose('mygraph',
...     operation(
...         name='addcolumns',
...         needs=['df', sideffect('df.b')],  # sideffect names can be anything
...         provides=[sideffect('df.sum')])(addcolumns)
... )
>>> graph
NetworkOperation('mygraph', needs=['df', 'sideffect(df.b)'],
                 provides=['sideffect(df.sum)'], x1 ops: addcolumns)
>>> df = pd.DataFrame({'a': [5, 0], 'b': [2, 1]})   
>>> graph({'df': df})['df']                         
        a       b
0       5       2
1       0       1

We didn’t get the sum column because the b sideffect was unsatisfied. We have to add its key to the inputs (with any value):

>>> graph({'df': df, sideffect("df.b"): 0})['df']   # doctest: +SKIP
        a       b       sum
0       5       2       7
1       0       1       1

Note that regular data in needs and provides do not match same-named sideffects. That is, in the following operation, the prices input is different from the sideffect(prices) output:

>>> def upd_prices(sales_df, prices):
...     sales_df["Prices"] = prices
>>> operation(fn=upd_prices,
...           name="upd_prices",
...           needs=["sales_df", "price"],
...           provides=[sideffect("price")])
operation(name='upd_prices', needs=['sales_df', 'price'],
          provides=['sideffect(price)'], fn='upd_prices')

Note

An operation with sideffects outputs only, have functions that return no value at all (like the one above). Such operation would still be called for their side-effects, if requested in outputs.

Tip

You may associate sideffects with other data to convey their relationships, simply by including their names in the string - in the end, it’s just a string - but no enforcement will happen from graphtik, like:

>>> sideffect("price[sales_df]")
'sideffect(price[sales_df])'
class graphtik.modifiers.vararg[source]

Annotate optionals needs to be fed as op-function’s *args when present in inputs.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

>>> from graphtik import operation, compose, vararg, debug
>>> def addall(a, *b):
...    return a + sum(b)

Designate b & c as an vararg arguments:

>>> graph = compose(
...     'mygraph',
...     operation(
...               name='addall',
...               needs=['a', vararg('b'), vararg('c')],
...               provides='sum'
...     )(addall)
... )
>>> with debug(True):
...     graph
NetworkOperation('mygraph',
                 needs=['a', optional('b'), optional('c')],
                 provides=['sum'],
                 x1 ops:
  +--FunctionalOperation(name='addall', needs=['a', vararg('b'), vararg('c')], provides=['sum'], fn='addall'))

The graph works with and without any of b or c inputs:

>>> graph(a=5, b=2, c=4)['sum']
11
>>> graph(a=5, b=2)
{'a': 5, 'b': 2, 'sum': 7}
>>> graph(a=5)
{'a': 5, 'sum': 5}
class graphtik.modifiers.varargs[source]

Like vararg, naming an optional iterable value in the inputs.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

>>> from graphtik import operation, compose, varargs
>>> def enlist(a, *b):
...    return [a] + list(b)
>>> graph = compose('mygraph',
...     operation(name='enlist', needs=['a', varargs('b')],
...     provides='sum')(enlist)
... )
>>> graph
NetworkOperation('mygraph',
                 needs=['a', optional('b')],
                 provides=['sum'],
                 x1 ops: enlist)

The graph works with or without b in the inputs:

>>> graph(a=5, b=[2, 20])['sum']
[5, 2, 20]
>>> graph(a=5)
{'a': 5, 'sum': [5]}
>>> graph(a=5, b=0xBAD)
Traceback (most recent call last):
...
graphtik.base.MultiValueError: Failed preparing needs:
    1. Expected needs[varargs('b')] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')

Attention

To avoid user mistakes, varargs does not accept strings (though iterables):

>>> graph(a=5, b="mistake")
Traceback (most recent call last):
...
graphtik.base.MultiValueError: Failed preparing needs:
    1. Expected needs[varargs('b')] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')

Module: netop

About network operations (those based on graphs)

class graphtik.netop.NetworkOperation(operations, name, *, outputs=None, predicate: Callable[[Any, Mapping], bool] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, merge=None, node_props=None)[source]

An operation that can compute a network-graph of operations.

Tip

Use compose() factory to prepare the net and build instances of this class.

compile(inputs=None, outputs=<UNSET>, predicate: Callable[[Any, Mapping], bool] = <UNSET>)graphtik.network.ExecutionPlan[source]

Produce a plan for the given args or outputs/predicate narrowed earlier.

Parameters
  • named_inputs – a string or a list of strings that should be fed to the needs of all operations.

  • outputs – A string or a list of strings with all data asked to compute. If None, all possible intermediate outputs will be kept. If not given, those set by a previous call to withset() or cstor are used.

  • predicate – Will be stored and applied on the next compute() or compile(). If not given, those set by a previous call to withset() or cstor are used.

Returns

the execution plan satisfying the given inputs, outputs & predicate

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

  • If solution does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

compute(named_inputs: Mapping, outputs: Union[Collection, str, None] = <UNSET>, predicate: Callable[[Any, Mapping], bool] = <UNSET>)graphtik.network.Solution[source]

Compile a plan & execute the graph, sequentially or parallel.

Attention

If intermediate compilation is successful, the “global abort run flag is reset before the execution starts.

Parameters
  • named_inputs – A mapping of names –> values that will be fed to the needs of all operations. Cloned, not modified.

  • outputs – A string or a list of strings with all data asked to compute. If None, all intermediate data will be kept.

Returns

The solution which contains the results of each operation executed +1 for inputs in separate dictionaries.

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

  • If plan does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

See also Operation.compute().

last_plan = None[source]

The execution_plan of the last call to compute(), stored as debugging aid.

name = None[source]

The name for the new netop, used when nesting them.

outputs = None[source]

The outputs names (possibly None) used to compile the plan.

predicate = None[source]

The node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Delegate to network if last-plan does not exist.

withset(outputs: Union[Collection, str, None] = <UNSET>, predicate: Callable[[Any, Mapping], bool] = <UNSET>, *, name=None, rescheduled=None, endured=None, parallel=None, marshalled=None)graphtik.netop.NetworkOperation[source]

Return a copy with a network pruned for the given needs & provides.

Parameters
  • outputs – Will be stored and applied on the next compute() or compile(). If not given, the value of this instance is conveyed to the clone.

  • predicate – Will be stored and applied on the next compute() or compile(). If not given, the value of this instance is conveyed to the clone.

  • name

    the name for the new netop:

    • if None, the same name is kept;

    • if True, a distinct name is devised:

      <old-name>-<uid>
      
    • otherwise, the given name is applied.

  • rescheduled – applies rescheduled to all contained operations

  • endured – applies endurance to all contained operations

  • parallel – mark all contained operations to be executed in parallel

  • marshalled – mark all contained operations to be marshalled (usefull when run in parallel with a process pool).

Returns

A narrowed netop clone, which MIGHT be empty!*

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

graphtik.netop.compose(name, op1, *operations, outputs: Union[Collection, str, None] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, merge=False, node_props=None)graphtik.netop.NetworkOperation[source]

Composes a collection of operations into a single computation graph, obeying the merge property, if set in the constructor.

Parameters
  • name (str) – A optional name for the graph being composed by this object.

  • op1 – syntactically force at least 1 operation

  • operations – Each argument should be an operation instance created using operation.

  • merge (bool) – If True, this compose object will attempt to merge together operation instances that represent entire computation graphs. Specifically, if one of the operation instances passed to this compose object is itself a graph operation created by an earlier use of compose the sub-operations in that graph are compared against other operations passed to this compose instance (as well as the sub-operations of other graphs passed to this compose instance). If any two operations are the same (based on name), then that operation is computed only once, instead of multiple times (one for each time the operation appears).

  • rescheduled – applies rescheduled to all contained operations

  • endured – applies endurance to all contained operations

  • parallel – mark all contained operations to be executed in parallel

  • marshalled – mark all contained operations to be marshalled (usefull when run in parallel with a process pool).

  • node_props – Added as-is into NetworkX graph, to provide for filtering by NetworkOperation.withset(). Also plot-rendering affected if they match Graphviz properties, unless they start with underscore(_)

Returns

Returns a special type of operation class, which represents an entire computation graph as a single operation.

Raises

ValueError – If the net` cannot produce the asked outputs from the given inputs.

Module: network

compile & execute network graphs of operations.

exception graphtik.network.AbortedException[source]

Raised from Network when abort_run() is called, and contains the solution …

with any values populated so far.

__module__ = 'graphtik.network'
__weakref__

list of weak references to the object (if defined)

class graphtik.network.ExecutionPlan[source]

A pre-compiled list of operation steps that can execute for the given inputs/outputs.

It is the result of the network’s compilation phase.

Note the execution plan’s attributes are on purpose immutable tuples.

net

The parent Network

needs

An IndexedSet with the input names needed to exist in order to produce all provides.

provides

An IndexedSet with the outputs names produces when all inputs are given.

dag

The regular (not broken) pruned subgraph of net-graph.

steps

The tuple of operation-nodes & instructions needed to evaluate the given inputs & asked outputs, free memory and avoid overwriting any given intermediate inputs.

asked_outs

When true, evictions may kick in (unless disabled by configurations), otherwise, evictions (along with prefect-evictions check) are skipped.

__abstractmethods__ = frozenset({})[source]
__dict__ = mappingproxy({'__module__': 'graphtik.network', '__doc__': "\n A pre-compiled list of operation steps that can :term:`execute` for the given inputs/outputs.\n\n It is the result of the network's :term:`compilation` phase.\n\n Note the execution plan's attributes are on purpose immutable tuples.\n\n .. attribute:: net\n\n The parent :class:`Network`\n .. attribute:: needs\n\n An :class:`.IndexedSet` with the input names needed to exist in order to produce all `provides`.\n .. attribute:: provides\n\n An :class:`.IndexedSet` with the outputs names produces when all `inputs` are given.\n .. attribute:: dag\n\n The regular (not broken) *pruned* subgraph of net-graph.\n .. attribute:: steps\n\n The tuple of operation-nodes & *instructions* needed to evaluate\n the given inputs & asked outputs, free memory and avoid overwriting\n any given intermediate inputs.\n .. attribute:: asked_outs\n\n When true, :term:`evictions` may kick in (unless disabled by :term:`configurations`),\n otherwise, *evictions* (along with prefect-evictions check) are skipped.\n ", 'prepare_plot_args': <function ExecutionPlan.prepare_plot_args>, '__repr__': <function ExecutionPlan.__repr__>, 'validate': <function ExecutionPlan.validate>, '_check_if_aborted': <function ExecutionPlan._check_if_aborted>, '_prepare_tasks': <function ExecutionPlan._prepare_tasks>, '_handle_task': <function ExecutionPlan._handle_task>, '_execute_thread_pool_barrier_method': <function ExecutionPlan._execute_thread_pool_barrier_method>, '_execute_sequential_method': <function ExecutionPlan._execute_sequential_method>, 'execute': <function ExecutionPlan.execute>, '__dict__': <attribute '__dict__' of 'ExecutionPlan' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc_data object>})
__module__ = 'graphtik.network'
__repr__()[source]

Return a nicely formatted representation string

_abc_impl = <_abc_data object>
_check_if_aborted(solution)[source]
_execute_sequential_method(solution: graphtik.network.Solution)[source]

This method runs the graph one operation at a time in a single thread

Parameters

solution – must contain the input values only, gets modified

_execute_thread_pool_barrier_method(solution: graphtik.network.Solution)[source]

This method runs the graph using a parallel pool of thread executors. You may achieve lower total latency if your graph is sufficiently sub divided into operations using this method.

Parameters

solution – must contain the input values only, gets modified

_handle_task(future, op, solution)None[source]

Un-dill parallel task results (if marshalled), and update solution / handle failure.

_prepare_tasks(operations, solution, pool, global_parallel, global_marshal) → Union[Future, graphtik.network._OpTask, bytes][source]

Combine ops+inputs, apply marshalling, and submit to execution pool (or not) …

based on global/pre-op configs.

execute(named_inputs, outputs=None, *, name='')graphtik.network.Solution[source]
Parameters
  • named_inputs – A mapping of names –> values that must contain at least the compulsory inputs that were specified when the plan was built (but cannot enforce that!). Cloned, not modified.

  • outputs – If not None, they are just checked if possible, based on provides, and scream if not.

Returns

The solution which contains the results of each operation executed +1 for inputs in separate dictionaries.

Raises

ValueError

  • If plan does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Called by plot() to create the nx-graph and other plot-args, e.g. solution.

  • Clone the graph or merge it with the one in the plot_args (see PlotArgs.clone_or_merge_graph().

  • For the rest args, prefer PlotArgs.with_defaults() over _replace(), not to override user args.

validate(inputs: Union[Collection, str, None], outputs: Union[Collection, str, None])[source]

Scream on invalid inputs, outputs or no operations in graph.

Raises

ValueError

  • If cannot produce any outputs from the given inputs, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

exception graphtik.network.IncompleteExecutionError[source]

Error report when any netop operations were canceled/failed.

The exception contains 3 arguments:

  1. the causal errors and conditions (1st arg),

  2. the list of collected exceptions (2nd arg), and

  3. the solution instance (3rd argument), to interrogate for more.

Returned by check_if_incomplete() or raised by scream_if_incomplete().

__module__ = 'graphtik.network'
__str__()[source]

Return str(self).

__weakref__

list of weak references to the object (if defined)

class graphtik.network.Network(*operations, graph=None)[source]

A graph of operations that can compile an execution plan.

needs[source]

the “base”, all data-nodes that are not produced by some operation

provides[source]

the “base”, all data-nodes produced by some operation

__abstractmethods__ = frozenset({})[source]
__init__(*operations, graph=None)[source]
Parameters
  • operations – to be added in the graph

  • graph – if None, create a new.

Raises

ValueError

if dupe operation, with msg:

Operations may only be added once, …

__module__ = 'graphtik.network'
__repr__()[source]

Return repr(self).

_abc_impl = <_abc_data object>
_append_operation(graph, operation: graphtik.op.Operation)[source]

Adds the given operation and its data requirements to the network graph.

  • Invoked during constructor only (immutability).

  • Identities are based on the name of the operation, the names of the operation’s needs, and the names of the data it provides.

  • Adds needs, operation & provides, in that order.

Parameters
  • graph – the networkx graph to append to

  • operation – operation instance to append

_apply_graph_predicate(graph, predicate)[source]
_build_execution_steps(pruned_dag, inputs: Collection, outputs: Optional[Collection]) → List[source]

Create the list of operation-nodes & instructions evaluating all

operations & instructions needed a) to free memory and b) avoid overwriting given intermediate inputs.

Parameters
  • pruned_dag – The original dag, pruned; not broken.

  • outputs – outp-names to decide whether to add (and which) evict-instructions

Instances of _EvictInstructions are inserted in steps between operation nodes to reduce the memory footprint of solutions while the computation is running. An evict-instruction is inserted whenever a need is not used by any other operation further down the DAG.

_cached_plans = None[source]

Speed up compile() call and avoid a multithreading issue(?) that is occurring when accessing the dag in networkx.

_prune_graph(inputs: Union[Collection, str, None], outputs: Union[Collection, str, None], predicate: Callable[[Any, Mapping], bool] = None) → Tuple[networkx.classes.digraph.DiGraph, Collection, Collection, Collection][source]

Determines what graph steps need to run to get to the requested outputs from the provided inputs: - Eliminate steps that are not on a path arriving to requested outputs; - Eliminate unsatisfied operations: partial inputs or no outputs needed; - consolidate the list of needs & provides.

Parameters
  • inputs – The names of all given inputs.

  • outputs – The desired output names. This can also be None, in which case the necessary steps are all graph nodes that are reachable from the provided inputs.

  • predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.

Returns

a 3-tuple with the pruned_dag & the needs/provides resolved based on the given inputs/outputs (which might be a subset of all needs/outputs of the returned graph).

Use the returned needs/provides to build a new plan.

Raises

ValueError

  • if outputs asked do not exist in network, with msg:

    Unknown output nodes: …

_topo_sort_nodes(dag) → List[source]

Topo-sort dag respecting operation-insertion order to break ties.

compile(inputs: Union[Collection, str, None] = None, outputs: Union[Collection, str, None] = None, predicate=None)graphtik.network.ExecutionPlan[source]

Create or get from cache an execution-plan for the given inputs/outputs.

See _prune_graph() and _build_execution_steps() for detailed description.

Parameters
  • inputs – A collection with the names of all the given inputs. If None`, all inputs that lead to given outputs are assumed. If string, it is converted to a single-element collection.

  • outputs – A collection or the name of the output name(s). If None`, all reachable nodes from the given inputs are assumed. If string, it is converted to a single-element collection.

  • predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.

Returns

the cached or fresh new execution plan

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

  • If solution does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

graph = None[source]

The networkx (Di)Graph containing all operations and dependencies, prior to compilation.

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Called by plot() to create the nx-graph and other plot-args, e.g. solution.

  • Clone the graph or merge it with the one in the plot_args (see PlotArgs.clone_or_merge_graph().

  • For the rest args, prefer PlotArgs.with_defaults() over _replace(), not to override user args.

class graphtik.network.Solution(plan, input_values)[source]

A chain-map collecting solution outputs and execution state (eg overwrites)

plan[source]

the plan that produced this solution

executed[source]

A dictionary with keys the operations executed, and values their status:

  • no key: not executed yet

  • value None: execution ok

  • value Exception: execution failed

canceled[source]

A sorted set of canceled operations due to upstream failures.

finalized[source]

a flag denoting that this instance cannot accept more results (after the finalized() has been invoked)

__abstractmethods__ = frozenset({})[source]
__copy__()[source]

New ChainMap or subclass with a new copy of maps[0] and refs to maps[1:]

__delitem__(key)[source]
__init__(plan, input_values)[source]

Initialize a ChainMap by setting maps to the given mappings. If no mappings are provided, a single empty dictionary is used.

__module__ = 'graphtik.network'
__repr__()[source]

Return repr(self).

_abc_impl = <_abc_data object>
check_if_incomplete() → Optional[graphtik.network.IncompleteExecutionError][source]

Return a IncompleteExecutionError if netop operations failed/canceled.

dag = None[source]

Cloned from plan will be modified, by removing the downstream edges of:

  • any partial outputs not provided, or

  • all provides of failed operations.

FIXME: SPURIOUS dag reversals on multi-threaded runs (see below next assertion)!

debugstr()[source]
finalize()[source]

invoked only once, after all ops have been executed

is_failed(op)[source]
operation_executed(op, outputs)[source]

Invoked once per operation, with its results.

It will update executed with the operation status and if outputs were partials, it will update canceled with the unsatisfied ops downstream of op.

Parameters
  • op – the operation that completed ok

  • outputs – The names of the outputs values the op` actually produced, which may be a subset of its provides. Sideffects are not considered.

operation_failed(op, ex)[source]

Invoked once per operation, with its results.

It will update executed with the operation status and the canceled with the unsatisfied ops downstream of op.

property overwrites

The data in the solution that exist more than once.

A “virtual” property to a dictionary with keys the names of values that exist more than once, and values, all those values in a list, ordered:

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

delegate to plan, with solution

scream_if_incomplete()[source]

Raise a IncompleteExecutionError if netop operations failed/canceled.

class graphtik.network._DataNode[source]

Dag node naming a data-value produced or required by an operation.

__module__ = 'graphtik.network'
__repr__()[source]

Return repr(self).

__slots__ = ()[source]
class graphtik.network._EvictInstruction[source]

A step in the ExecutionPlan to evict a computed value from the solution.

It’s a step in ExecutionPlan.steps for the data-node str that frees its data-value from solution after it is no longer needed, to reduce memory footprint while computing the graph.

__module__ = 'graphtik.network'
__repr__()[source]

Return repr(self).

__slots__ = ()[source]
class graphtik.network._OpTask(op, sol, solid)[source]

Mimic concurrent.futures.Future for sequential execution.

This intermediate class is needed to solve pickling issue with process executor.

__call__()[source]

Call self as a function.

__init__(op, sol, solid)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'graphtik.network'
__repr__()[source]

Return repr(self).

__slots__ = ('op', 'sol', 'result', 'solid')
get()[source]

Call self as a function.

logname = 'graphtik.network'
marshalled()[source]
op
result
sol
solid
graphtik.network._do_task(task)[source]

Un-dill the simpler _OpTask & Dill the results, to pass through pool-processes.

See https://stackoverflow.com/a/24673524/548792

graphtik.network._isDebugLogging()[source]
graphtik.network._optionalized(graph, data)[source]

Retain optionality of a data node based on all needs edges.

graphtik.network._unsatisfied_operations(dag, inputs: Collection) → List[source]

Traverse topologically sorted dag to collect un-satisfied operations.

Unsatisfied operations are those suffering from ANY of the following:

  • They are missing at least one compulsory need-input.

    Since the dag is ordered, as soon as we’re on an operation, all its needs have been accounted, so we can get its satisfaction.

  • Their provided outputs are not linked to any data in the dag.

    An operation might not have any output link when _prune_graph() has broken them, due to given intermediate inputs.

Parameters
  • dag – a graph with broken edges those arriving to existing inputs

  • inputs – an iterable of the names of the input values

Returns

a list of unsatisfied operations to prune

graphtik.network._yield_datanodes(nodes)[source]

May scan dag nodes.

graphtik.network.collect_requirements(graph) → Tuple[boltons.setutils.IndexedSet, boltons.setutils.IndexedSet][source]

Collect & split datanodes in (possibly overlapping) needs/provides.

graphtik.network.log = <Logger graphtik.network (WARNING)>

If this logger is eventually DEBUG-enabled, the string-representation of network-objects (network, plan, solution) is augmented with children’s details.

graphtik.network.yield_ops(nodes)[source]

May scan (preferably) plan.steps or dag nodes.

Module: plot

Plotting of graph graphs handled by active plotter.

Separate from graphtik.base to avoid too many imports too early. from contextlib import contextmanager

class graphtik.plot.NodeArgs[source]

All the args for Plotter._make_node() call.

property clustered

Collect the actual clustered dot_nodes among the given nodes.

property dot

Where to add graphviz nodes & stuff.

property dot_node

The pydot-node created

property node_attrs

Attributes gotten from nx-graph fot the given node.

property nx_node

The node (data(str) or Operation) as gotten from nx-graph.

class graphtik.plot.Plotter(style: graphtik.plot.Style = None)[source]

a plotter renders diagram images of plottables.

style[source]

The customizable Style instance controlling theming values & dictionaries for plots.

build_pydot(plot_args: graphtik.base.PlotArgs) → pydot.Dot[source]

Build a pydot.Dot out of a Network graph/steps/inputs/outputs and return it

to be fed into Graphviz to render.

See Plottable.plot() for the arguments, sample code, and the legend of the plots.

legend(filename=None, show=None, jupyter_render: Mapping = None)[source]

Generate a legend for all plots (see Plottable.plot() for args)

See Plotter.render_pydot() for the rest arguments.

plot(plot_args: graphtik.base.PlotArgs)[source]
render_pydot(dot: pydot.Dot, filename=None, show=False, jupyter_render: str = None)[source]

Render a pydot.Dot instance with Graphviz in a file and/or in a matplotlib window.

Parameters
  • dot – the pre-built pydot.Dot instance

  • filename (str) – Write diagram into a file. Common extensions are .png .dot .jpg .jpeg .pdf .svg call supported_plot_formats() for more.

  • show – If it evaluates to true, opens the diagram in a matplotlib window. If it equals -1, it returns the image but does not open the Window.

  • jupyter_render

    a nested dictionary controlling the rendering of graph-plots in Jupyter cells. If None, defaults to default_jupyter_render; you may modify those in place and they will apply for all future calls (see Jupyter notebooks).

    You may increase the height of the SVG cell output with something like this:

    netop.plot(jupyter_render={"svg_element_styles": "height: 600px; width: 100%"})
    

Returns

the matplotlib image if show=-1, or the given dot annotated with any jupyter-rendering configurations given in jupyter_render parameter.

See Plottable.plot() for sample code.

with_styles(**kw)graphtik.plot.Plotter[source]

Returns a cloned plotter with deep-coped styles modified as given.

See also Style.with_set().

class graphtik.plot.Ref(ref, base=None)[source]

Cross-reference (by default) to Style attribute values.

property base
rebased(base)[source]

Makes re-based clone to the given class/object.

ref
property target
class graphtik.plot.Style(_prototype: Optional[graphtik.plot.Style] = None, **kw)[source]

The poor man’s css-like plot styles applied like a theme.

Note

Changing class attributes AFTER the module has loaded WON’T change themes; Either patch directly the Plotter.style of active plotter), or pass a new styles to a new plotter, as described in Plot customizations.

arch_url = 'https://graphtik.readthedocs.io/en/latest/arch.html'

the url to the architecture section explaining graphtik glossary, linked by legend.

broken_color = 'Red'
cancel_color = 'Grey'
failed_color = 'LightCoral'
fill_color = 'wheat'
in_plan = '#000099'
kw_data = {}[source]
kw_data_in_plan = {'color': Ref(<class 'graphtik.plot.Style'>, 'in_plan')}
kw_data_in_solution = {'fillcolor': Ref(<class 'graphtik.plot.Style'>, 'fill_color'), 'style': 'filled'}
kw_data_overwritten = {'fillcolor': Ref(<class 'graphtik.plot.Style'>, 'overwrite_color'), 'style': 'filled'}
kw_data_pruned = {'color': Ref(<class 'graphtik.plot.Style'>, 'cancel_color'), 'fontcolor': Ref(<class 'graphtik.plot.Style'>, 'cancel_color'), 'tooltip': '(pruned node)'}
kw_edge = {}[source]
kw_edge_broken = {'color': Ref(<class 'graphtik.plot.Style'>, 'broken_color')}
kw_edge_endured = {'style': 'dotted'}
kw_edge_optional = {'style': 'dashed'}
kw_edge_rescheduled = {'style': 'dotted'}
kw_edge_sideffect = {'color': 'blue'}
kw_graph = {'fontname': 'italic', 'graph_type': 'digraph', 'splines': 'ortho'}
kw_graph_net = {}[source]
kw_graph_netop = {}[source]
kw_graph_plan = {}[source]
kw_graph_solution = {}[source]
kw_graph_type = {'netop': {}, 'net': {}, 'plan': {}, 'solution': {}, None: {}}

styles per plot-type

kw_legend = {'URL': 'https://graphtik.readthedocs.io/en/latest/_images/GraphtikLegend.svg', 'fillcolor': 'yellow', 'name': 'legend', 'shape': 'component', 'style': 'filled', 'target': '_top'}

If 'URL'` key missing/empty, no legend icon included in plots.

kw_op = {}[source]

props for operation node (outside of label))

kw_op_canceled = {'fillcolor': Ref(<class 'graphtik.plot.Style'>, 'cancel_color')}
kw_op_endured = {'penwidth': Ref(<class 'graphtik.plot.Style'>, 'resched_thickness')}
kw_op_executed = {'fillcolor': Ref(<class 'graphtik.plot.Style'>, 'fill_color')}
kw_op_failed = {'fillcolor': Ref(<class 'graphtik.plot.Style'>, 'failed_color')}
kw_op_label = {}[source]

props only for HTML-Table label

kw_op_pruned = {'color': Ref(<class 'graphtik.plot.Style'>, 'cancel_color'), 'fontcolor': Ref(<class 'graphtik.plot.Style'>, 'cancel_color')}
kw_op_rescheduled = {'penwidth': Ref(<class 'graphtik.plot.Style'>, 'resched_thickness')}
op_bad_html_label_keys = {'label', 'shape', 'style'}

Keys to ignore from operation styles & node-attrs, because they are handled internally by HTML-Label, and/or interact badly with that label.

op_template = <Template memory:7f8efbda9ac8>

Try to mimic a regular Graphviz node attributes (see examples in test.test_plot.test_op_template_full() for params). TODO: fix jinja2 template is un-picklable!

overwrite_color = 'SkyBlue'
py_item_url_format: Union[str, Callable[[str], str]] = None[source]

See Plotter._make_py_item_link().

resched_thickness = 4
resolve_refs(values: dict = None)None[source]

Rebase any refs in styles, and deep copy (for free) as instance attributes.

Raises

Nothing(!), not to CRASH default active plotter on import-time. Ref-errors are log-ERROR reported, and the item with the ref is skipped.

steps_color = '#009999'
with_set(**kw)graphtik.plot.Style[source]

Returns a deep-clone modified by kw.

graphtik.plot.active_plotter_plugged(plotter: graphtik.plot.Plotter)None[source]

Like set_active_plotter() as a context-manager, resetting back to old value.

graphtik.plot.as_identifier(s)[source]

Convert string into a valid ID, both for html & graphviz.

It must not rely on Graphviz’s HTML-like string, because it would not be a valid HTML-ID.

graphtik.plot.default_jupyter_render = {'svg_container_styles': '', 'svg_element_styles': 'width: 100%; height: 300px;', 'svg_pan_zoom_json': '{controlIconsEnabled: true, fit: true}'}

A nested dictionary controlling the rendering of graph-plots in Jupyter cells,

as those returned from Plottable.plot() (currently as SVGs). Either modify it in place, or pass another one in the respective methods.

The following keys are supported.

Parameters
  • svg_pan_zoom_json

    arguments controlling the rendering of a zoomable SVG in Jupyter notebooks, as defined in https://github.com/ariutta/svg-pan-zoom#how-to-use if None, defaults to string (also maps supported):

    "{controlIconsEnabled: true, zoomScaleSensitivity: 0.4, fit: true}"
    

  • svg_element_styles

    mostly for sizing the zoomable SVG in Jupyter notebooks. Inspect & experiment on the html page of the notebook with browser tools. if None, defaults to string (also maps supported):

    "width: 100%; height: 300px;"
    

  • svg_container_styles – like svg_element_styles, if None, defaults to empty string (also maps supported).

graphtik.plot.get_active_plotter()graphtik.plot.Plotter[source]

Get the previously active plotter instance or default one.

graphtik.plot.get_node_name(nx_node)[source]
graphtik.plot.graphviz_html_string(s, *, repl_nl=None, repl_colon=None, xmltext=None)[source]

Workaround pydot parsing of node-id & labels by encoding as HTML.

  • pydot library does not quote DOT-keywords anywhere (pydot#111).

  • Char : on node-names denote port/compass-points and break IDs (pydot#224).

  • Non-strings are not quoted_if_necessary by pydot.

  • NLs im tooltips of HTML-Table labels need substitution with the XML-entity.

  • HTML-Label attributes (xmlattr=True) need both html-escape & quote.

Attention

It does not correctly handle ID:port:compass-point format.

See https://www.graphviz.org/doc/info/lang.html)

graphtik.plot.legend(filename=None, show=None, jupyter_render: Mapping = None, plotter: graphtik.plot.Plotter = None)[source]

Generate a legend for all plots (see Plottable.plot() for args)

Parameters

plotter – override the active plotter

See Plotter.render_pydot() for the rest arguments.

graphtik.plot.quote_html_tooltips(s)[source]

Graphviz HTML-Labels ignore NLs & TABs.

graphtik.plot.quote_node_id(s)[source]

See graphviz_html_string()

graphtik.plot.set_active_plotter(plotter: graphtik.plot.Plotter)[source]

The default instance to render plottables,

unless overridden with a plotter argument in Plottable.plot().

Parameters

plotter – the plotter instance to install

graphtik.plot.supported_plot_formats() → List[str][source]

return automatically all pydot extensions

Module: config

configurations for network execution, and utilities on them.

See also

methods plot.active_plotter_plugged(), plot.set_active_plotter(), plot.get_active_plotter()

Plot configrations were not defined here, not to pollute import space early, until they are actually needed.

graphtik.config.abort_run()[source]

Sets the abort run global flag, to halt all currently or future executing plans.

This global flag is reset when any NetworkOperation.compute() is executed, or manually, by calling reset_abort().

graphtik.config.debug(enabled)[source]

Like set_debug() as a context-manager, resetting back to old value.

graphtik.config.evictions_skipped(enabled)[source]

Like set_skip_evictions() as a context-manager, resetting back to old value.

graphtik.config.execution_pool_plugged(pool: Optional[Pool])[source]

Like set_execution_pool() as a context-manager, resetting back to old value.

graphtik.config.get_execution_pool() → Optional[Pool][source]

Get the process-pool for parallel plan executions.

graphtik.config.is_abort()[source]

Return True if networks have been signaled to stop execution.

graphtik.config.is_debug() → Optional[bool][source]

see set_debug()

graphtik.config.is_endure_operations() → Optional[bool][source]

see set_endure_operations()

graphtik.config.is_marshal_tasks() → Optional[bool][source]

see set_marshal_tasks()

graphtik.config.is_parallel_tasks() → Optional[bool][source]

see set_parallel_tasks()

graphtik.config.is_reschedule_operations() → Optional[bool][source]

see set_reschedule_operations()

graphtik.config.is_skip_evictions() → Optional[bool][source]

see set_skip_evictions()

graphtik.config.is_solid_true(*tristates, default=False)[source]

Utility combining multiple tri-state booleans.

graphtik.config.operations_endured(enabled)[source]

Like set_endure_operations() as a context-manager, resetting back to old value.

graphtik.config.operations_reschedullled(enabled)[source]

Like set_reschedule_operations() as a context-manager, resetting back to old value.

graphtik.config.reset_abort()[source]

Reset the abort run global flag, to permit plan executions to proceed.

graphtik.config.set_debug(enabled)[source]

When true, string-representation of network objects and errors become more detailed.

Note that enabling this flag is different from enabling logging in DEBUG, since it affects all code (eg interactive printing in debugger session, exceptions, doctests), not just debug statements (also affected by this flag).

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_endure_operations(enabled)[source]

Enable/disable globally endurance to keep executing even if some operations fail.

Parameters

enable

  • If None (default), respect the flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

.

graphtik.config.set_execution_pool(pool: Optional[Pool])[source]

Set the process-pool for parallel plan executions.

You may have to :also func:set_marshal_tasks() to resolve pickling issues.

graphtik.config.set_marshal_tasks(enabled)[source]

Enable/disable globally marshalling of parallel operations, …

inputs & outputs with dill, which might help for pickling problems.

Parameters

enable

  • If None (default), respect the respective flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_parallel_tasks(enabled)[source]

Enable/disable globally parallel execution of operations.

Parameters

enable

  • If None (default), respect the respective flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_reschedule_operations(enabled)[source]

Enable/disable globally rescheduling for operations returning only partial outputs.

Parameters

enable

  • If None (default), respect the flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

.

graphtik.config.set_skip_evictions(enabled)[source]

When true, disable globally evictions, to keep all intermediate solution values, …

regardless of asked outputs.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.tasks_in_parallel(enabled)[source]

Like set_parallel_tasks() as a context-manager, resetting back to old value.

graphtik.config.tasks_marshalled(enabled)[source]

Like set_marshal_tasks() as a context-manager, resetting back to old value.

Module: base

Generic or specific utilities without polluting imports.

exception graphtik.base.MultiValueError[source]
graphtik.base.NO_RESULT = <NO_RESULT>

When an operation function returns this special value, it implies operation has no result at all, (otherwise, it would have been a single result, None).`

class graphtik.base.PlotArgs[source]

All the args of a Plottable.plot() call.

clone_or_merge_graph(base_graph)graphtik.base.PlotArgs[source]

Overlay graph over base_graph, or clone base_graph, if no attribute.

Returns

the updated plot_args

property clusters

Alias for field number 7

property filename

Alias for field number 9

property graph

Alias for field number 1

property inputs

Alias for field number 4

property jupyter_render

Alias for field number 8

property kw_build_pydot
property kw_render_pydot
property name

Alias for field number 2

property outputs

Alias for field number 5

property plotter

Alias for field number 0

property show

Alias for field number 10

property solution

Alias for field number 6

property steps

Alias for field number 3

with_defaults(*args, **kw)graphtik.base.PlotArgs[source]

Replace only fields with None values.

class graphtik.base.Plottable[source]

Classes wishing to plot their graphs should inherit this and …

implement property plot to return a “partial” callable that somehow ends up calling plot.render_pydot() with the graph or any other args bound appropriately. The purpose is to avoid copying this function & documentation here around.

plot(filename=None, show=False, *, plotter: Plotter = None, graph: networkx.Graph = None, name=None, steps=None, inputs=None, outputs=None, solution=None, clusters=None, jupyter_render: Union[None, Mapping, str] = None) → pydot.Dot[source]

Entry-point for plotting ready made operation graphs.

Parameters
  • filename (str) – Write diagram into a file. Common extensions are .png .dot .jpg .jpeg .pdf .svg call plot.supported_plot_formats() for more.

  • show – If it evaluates to true, opens the diagram in a matplotlib window. If it equals -1, it plots but does not open the Window.

  • plotter – an instance to handle plotting; if none, the active plotter is used by default.

  • name – if not given, dot-lang graph would is named “G”; necessary to be unique when referring to generated CMAPs. No need to quote it, handled by the plotter, downstream.

  • graph (str) –

    (optional) A nx.Digraph with overrides to merge with the graph provided by underlying plottables (translated by the active plotter).

    It may contain “public” or “private graph, node & edge attributes:

    • ”private” attributes: those starting with underscore(_), handled by plotter:

      • _source (graph): a non user-overridable attribute with values used to select plotter-styles:

        netop | net | plan | solution | <'_source` missing>
        
      • _fn_link_target & _fn_link_target (node): if truthy, override result 2-tuple of _get_fn_link().

      • _op_tooltip & _fn_tooltip (node): if truthy, override those derrived from _make_op_tooltip() & _make_op_tooltip().

    • ”public” attributes: reaching Graphviz as-is.

      Note

      Remember to escape those values as Graphviz HTML-Like strings (use plot.graphviz_html_string()).

  • inputs – an optional name list, any nodes in there are plotted as a “house”

  • outputs – an optional name list, any nodes in there are plotted as an “inverted-house”

  • solution – an optional dict with values to annotate nodes, drawn “filled” (currently content not shown, but node drawn as “filled”). It extracts more infos from a Solution instance, such as, if solution has an executed attribute, operations contained in it are drawn as “filled”.

  • clusters – an optional mapping of nodes –> cluster-names, to group them

  • jupyter_render – a nested dictionary controlling the rendering of graph-plots in Jupyter cells, if None, defaults to jupyter_render; you may modify it in place and apply for all future calls (see Jupyter notebooks).

Returns

a pydot.Dot instance (for reference to as similar API to pydot.Dot instance, visit: https://pydotplus.readthedocs.io/reference.html#pydotplus.graphviz.Dot)

The pydot.Dot instance returned is rendered directly in Jupyter/IPython notebooks as SVG images (see Jupyter notebooks).

Note that the graph argument is absent - Each Plottable provides its own graph internally; use directly render_pydot() to provide a different graph.

Graphtik Legend

NODES:

oval

function

egg

subgraph operation

house

given input

inversed-house

asked output

polygon

given both as input & asked as output (what?)

square

intermediate data, neither given nor asked.

red frame

evict-instruction, to free up memory.

filled

data node has a value in solution OR function has been executed.

thick frame

function/data node in execution steps.

ARROWS

solid black arrows

dependencies (source-data need-ed by target-operations, sources-operations provides target-data)

dashed black arrows

optional needs

blue arrows

sideffect needs/provides

wheat arrows

broken dependency (provide) during pruning

green-dotted arrows

execution steps labeled in succession

To generate the legend, see legend().

Sample code:

>>> from graphtik import compose, operation
>>> from graphtik.modifiers import optional
>>> from operator import add
>>> netop = compose("netop",
...     operation(name="add", needs=["a", "b1"], provides=["ab1"])(add),
...     operation(name="sub", needs=["a", optional("b2")], provides=["ab2"])(lambda a, b=1: a-b),
...     operation(name="abb", needs=["ab1", "ab2"], provides=["asked"])(add),
... )
>>> netop.plot(show=True);                 # plot just the graph in a matplotlib window # doctest: +SKIP
>>> inputs = {'a': 1, 'b1': 2}
>>> solution = netop(**inputs)             # now plots will include the execution-plan
>>> netop.plot('plot1.svg', inputs=inputs, outputs=['asked', 'b1'], solution=solution);           # doctest: +SKIP
>>> dot = netop.plot(solution=solution);   # just get the `pydot.Dot` object, renderable in Jupyter
>>> print(dot)
digraph netop {
fontname=italic;
label=<netop>;
splines=ortho;
<a> [fillcolor=wheat, shape=invhouse, style=filled, tooltip="(int) 1"];
...
abstract prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Called by plot() to create the nx-graph and other plot-args, e.g. solution.

class graphtik.base.Token(*args)[source]

Guarantee equality, not(!) identity, across processes.

hashid
graphtik.base.aslist(i, argname, allowed_types=<class 'list'>)[source]

Utility to accept singular strings as lists, and None –> [].

graphtik.base.astuple(i, argname, allowed_types=<class 'tuple'>)[source]
graphtik.base.func_name(fn, default=Ellipsis, mod=None, fqdn=None, human=None) → Optional[str][source]

FQDN of fn, descending into partials to print their args.

Parameters
  • default – What to return if it fails; by default it raises.

  • mod – when true, prepend module like module.name.fn_name

  • fqdn – when true, use __qualname__ (instead of __name__) which differs mostly on methods, where it contains class(es), and locals, respectively (PEP 3155). Sphinx uses fqdn=True for generating IDs.

  • human – when true, partials denote their args like fn({"a": 1}, ...) in the returned text, otherwise, just the (fqd-)name, appropriate for IDs.

Returns

a (possibly dot-separated) string, or default (unless this is ...`).

Raises

Only if default is ..., otherwise, errors debug-logged.

Examples

>>> func_name(func_name)
'func_name'
>>> func_name(func_name, mod=1)
'graphtik.base.func_name'
>>> func_name(MultiValueError.mro, fqdn=0)
'mro'
>>> func_name(MultiValueError.mro, fqdn=1)
'MultiValueError.mro'

Even functions defined in docstrings are reported:

>>> def f():
...     def inner():
...         pass
...     return inner
>>> func_name(f, mod=1, fqdn=1)
'graphtik.base.f'
>>> func_name(f(), fqdn=1)
'f.<locals>.inner'

On failures, arg default controls the outcomes:

TBD

graphtik.base.func_source(fn, default=Ellipsis, human=None) → Optional[Tuple[str, int]][source]

Like inspect.getsource() supporting partials.

Parameters
  • default – If given, better be a 2-tuple respecting types, or ..., to raise.

  • human – when true, denote builtins like python does

graphtik.base.func_sourcelines(fn, default=Ellipsis, human=None) → Optional[Tuple[str, int]][source]

Like inspect.getsourcelines() supporting partials.

Parameters

default – If given, better be a 2-tuple respecting types, or ..., to raise.

graphtik.base.jetsam(ex, locs, *salvage_vars: str, annotation='jetsam', **salvage_mappings)[source]

Annotate exception with salvaged values from locals() and raise!

Parameters
  • ex – the exception to annotate

  • locs

    locals() from the context-manager’s block containing vars to be salvaged in case of exception

    ATTENTION: wrapped function must finally call locals(), because locals dictionary only reflects local-var changes after call.

  • annotation – the name of the attribute to attach on the exception

  • salvage_vars – local variable names to save as is in the salvaged annotations dictionary.

  • salvage_mappings – a mapping of destination-annotation-keys –> source-locals-keys; if a source is callable, the value to salvage is retrieved by calling value(locs). They take precedence over`salvage_vars`.

Raises

any exception raised by the wrapped function, annotated with values assigned as attributes on this context-manager

  • Any attributes attached on this manager are attached as a new dict on the raised exception as new jetsam attribute with a dict as value.

  • If the exception is already annotated, any new items are inserted, but existing ones are preserved.

Example:

Call it with managed-block’s locals() and tell which of them to salvage in case of errors:

try:
    a = 1
    b = 2
    raise Exception()
exception Exception as ex:
    jetsam(ex, locals(), "a", b="salvaged_b", c_var="c")
    raise

And then from a REPL:

import sys
sys.last_value.jetsam
{'a': 1, 'salvaged_b': 2, "c_var": None}

** Reason:**

Graphs may become arbitrary deep. Debugging such graphs is notoriously hard.

The purpose is not to require a debugger-session to inspect the root-causes (without precluding one).

Naively salvaging values with a simple try/except block around each function, blocks the debugger from landing on the real cause of the error - it would land on that block; and that could be many nested levels above it.

Module: sphinxext

Extends Sphinx with graphtik directive rendering plots from doctest code.

class graphtik.sphinxext.DocFilesPurgatory[source]

Keeps 2-way associations of docs <–> abs-files, to purge them.

doc_fpaths: Dict[str, Set[Path]] = None[source]

Multi-map: docnames -> abs-file-paths

purge_doc(docname: str)[source]

Remove doc-files not used by any other doc.

register_doc_fpath(docname: str, fpath: pathlib.Path)[source]

Must be absolute, for purging to work.

class graphtik.sphinxext.GraphtikDoctestDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]

Embeds plots from doctest code (see graphtik).

class graphtik.sphinxext.GraphtikTestoutputDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]

Like graphtik directive, but emulates doctest testoutput blocks.

class graphtik.sphinxext.dynaimage(rawsource='', *children, **attributes)[source]

Writes a tag in tag attr (<img> for PNGs, <object> for SVGs/PDFs).

class graphtik.sphinxext.graphtik_node(rawsource='', *children, **attributes)[source]

Non-writtable node wrapping (literal-block + figure(dynaimage)) nodes.

The figure gets the :name: & :align: options, the contained dynaimage gets the :height”, :width:, :scale:, :classes: options.