5. API Reference

graphtik

Lightweight computation graphs for Python.

graphtik.fnop

compose operation/dependency from functions, matching/zipping inputs/outputs during execution.

graphtik.pipeline

compose pipelines by combining operations into network.

graphtik.modifier

modifiers change dependency behavior during planning & execution.

graphtik.planning

compose network of operations & dependencies, compile the plan.

graphtik.execution

execute the plan to derrive the solution.

graphtik.plot

plotting handled by the active plotter & current theme.

graphtik.config

configurations for network execution, and utilities on them.

graphtik.base

Generic utilities, exceptions and operation & plottable base classes.

graphtik.sphinxext

Extends Sphinx with graphtik directive for plotting from doctest code.

digraph { label="graphtik-v8.3.1+ module dependencies"; labelloc=t; node [style=filled]; nodesep=0.55; remincross=true; node [target="_top"]; edge [target="_top"]; "plot.py" [shape=component tooltip="(extra)" fillcolor=Aquamarine URL="../reference.html#module-graphtik.plot"]; "sphinxext/" [shape=component tooltip="(extra)" fillcolor=Aquamarine URL="../reference.html#module-graphtik.sphinxext"]; subgraph cluster_base { label="base"; labelloc=b; tooltip="almost all other modules depend on these"; "config.py" [shape=component tooltip="(public) almost everything import this module" fillcolor=wheat URL="../reference.html#module-graphtik.config"]; "base.py" [shape=component tooltip="(implicit) everything imports this module, not shown" fillcolor=wheat URL="../reference.html#module-graphtik.base"]; "modifiers.py" [shape=component tooltip="(public) almost everything imports this module" fillcolor=wheat URL="../reference.html#module-graphtik.modifiers"]; } "pipeline.py" [shape=component tooltip="(public)" fillcolor=wheat URL="../reference.html#module-graphtik.pipeline"]; "fnop.py" [shape=component tooltip="(public)" fillcolor=wheat URL="../reference.html#module-graphtik.fnop"]; subgraph cluster_planning { label="core modules"; tooltip="related to graph solution"; URL="arch.html#term-execution"; "execution.py" [shape=component tooltip="(private)" fillcolor=AliceBlue URL="../reference.html#module-graphtik.execution"]; "planning.py" [shape=component tooltip="(private)" fillcolor=AliceBlue URL="../reference.html#module-graphtik.planning"]; } {"fnop.py", "planning.py"} -> "base.py" [tooltip="(import-time)" headport=n tailport=s]; "execution.py" -> "base.py" [tooltip="(import-time)" headport=n tailport=se]; {"pipeline.py", "plot.py"} -> "base.py" [tooltip="(import-time)" headport=n tailport=sw]; "base.py" -> "plot.py" [tooltip="(run-time)" style=dashed headport=s]; "execution.py" -> "planning.py" [tooltip="(import-time)"]; "planning.py" -> "execution.py" [tooltip="(run-time)" style=dashed]; "pipeline.py" -> "planning.py" [tooltip="(run-time)" style=dashed]; "fnop.py" -> "pipeline.py" [style=dashed tooltip="(run-time) just for plotting"]; "sphinxext/" -> "plot.py" [tooltip="(import-time)" headport=n tailport=s]; }

Module: fnop

compose operation/dependency from functions, matching/zipping inputs/outputs during execution.

Note

This module (along with modifier & pipeline) is what client code needs to define pipelines on import time without incurring a heavy price (<5ms on a 2019 fast PC)

class graphtik.fnop.FnOp(fn: Callable = None, name=None, needs: Optional[Union[Collection, str]] = None, provides: Optional[Union[Collection, str]] = None, aliases: Mapping = None, *, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

An operation performing a callable (ie a function, a method, a lambda).

Tip

  • Use operation() factory to build instances of this class instead.

  • Call withset() on existing instances to re-configure new clones.

  • See diacritics to understand printouts of this class.

Differences between various dependency operation attributes:

dependency attribute

dupes

sfx

alias

SFXED

needs

needs

SINGULAR

op_needs

SINGULAR

_fn_needs

STRIPPED

provides

provides

SINGULAR

op_provides

SINGULAR

_fn_provides

STRIPPED

__abstractmethods__ = frozenset({})[source]
__call__(*args, **kwargs)[source]

Like dict args, delegates to compute().

__eq__(other)[source]

Operation identity is based on name.

__hash__()[source]

Operation identity is based on name.

__init__(fn: Callable = None, name=None, needs: Optional[Union[Collection, str]] = None, provides: Optional[Union[Collection, str]] = None, aliases: Mapping = None, *, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

Build a new operation out of some function and its requirements.

See operation() for the full documentation of parameters, study the code for attributes (or read them from rendered sphinx site).

__module__ = 'graphtik.fnop'
__repr__()[source]

Display operation & dependency names annotated with diacritics.

_abc_impl = <_abc_data object>
_fn_needs[source]

Value names the underlying function requires (DUPES preserved, NO-SFX, STRIPPED sideffected).

_fn_provides[source]

Value names the underlying function produces (DUPES, NO-ALIASES, NO_SFX, STRIPPED sideffected).

_match_inputs_with_fn_needs(named_inputs) → Tuple[list, list, dict][source]
_prepare_match_inputs_error(missing: List, varargs_bad: List, named_inputs: Mapping)ValueError[source]
_zip_results_with_provides(results)dict[source]

Zip results with expected “real” (without sideffects) provides.

aliases[source]

an optional mapping of fn_provides to additional ones, together comprising this operations op_provides.

You cannot alias an alias.

compute(named_inputs=None, outputs: Optional[Union[Collection, str]] = None)dict[source]

Compute (optional) asked outputs for the given named_inputs.

It is called by Network. End-users should simply call the operation with named_inputs as kwargs.

Parameters

named_inputs – the input values with which to feed the computation.

Returns list

Should return a list values representing the results of running the feed-forward computation on inputs.

property deps

All dependency names, including op_ & internal _fn_.

if not DEBUG, all deps are converted into lists, ready to be printed.

endured[source]

If true, even if callable fails, solution will reschedule; ignored if endurance enabled globally.

fn[source]

The operation’s underlying function.

marshalled[source]

If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).

name[source]

a name for the operation (e.g. ‘conv1’, ‘sum’, etc..); any “parents split by dots(.)”. :seealso: Nesting

needs[source]

The needs almost as given by the user (which may contain MULTI-sideffecteds and dupes), roughly morphed into _fn_provides + sideffects (DUPES, SFX, SINGULARIZED sideffecteds). It is stored for builder functionality to work.

node_props[source]

Added as-is into NetworkX graph, and you may filter operations by Pipeline.withset(). Also plot-rendering affected if they match Graphviz properties, unless they start with underscore(_).

op_needs[source]

Value names ready to lay the graph for pruning (NO-DUPES, SFX, SINGULAR sideffecteds).

op_provides[source]

Value names ready to lay the graph for pruning (NO DUPES, ALIASES, SFX, SINGULAR sideffecteds).

parallel[source]

execute in parallel

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Delegate to a provisional network with a single op .

provides[source]

The provides almost as given by the user (which may contain MULTI-sideffecteds and dupes), roughly morphed into _fn_provides + sideffects (DUPES, NO-ALIASES, SFX, SINGULAR sideffecteds). It is stored for builder functionality to work.

rescheduled[source]

If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.

returns_dict[source]

If true, it means the underlying function returns dictionary , and no further processing is done on its results, i.e. the returned output-values are not zipped with provides.

It does not have to return any alias outputs.

Can be changed amidst execution by the operation’s function, but it is easier for that function to simply call set_results_by_name().

validate_fn_name()[source]

Call it before enclosing it in a pipeline, or it will fail on compute().

withset(fn: Callable = Ellipsis, name=Ellipsis, needs: Optional[Union[Collection, str]] = Ellipsis, provides: Optional[Union[Collection, str]] = Ellipsis, aliases: Mapping = Ellipsis, *, rescheduled=Ellipsis, endured=Ellipsis, parallel=Ellipsis, marshalled=Ellipsis, returns_dict=Ellipsis, node_props: Mapping = Ellipsis, renamer=None)graphtik.fnop.FnOp[source]

Make a clone with the some values replaced, or operation and dependencies renamed.

if renamer given, it is applied on top (and afterwards) any other changed values, for operation-name, needs, provides & any aliases.

Parameters

renamer

  • if a dictionary, it renames any operations & data named as keys into the respective values by feeding them into :func:.dep_renamed()`, so values may be single-input callables themselves.

  • if it is a callable(), it is given a RenArgs instance to decide the node’s name.

The callable may return a str for the new-name, or any other false value to leave node named as is.

Attention

The callable SHOULD wish to preserve any modifier on dependencies, and use dep_renamed() if a callable is given.

Returns

a clone operation with changed/renamed values asked

Raise
  • (ValueError, TypeError): all cstor validation errors

  • ValueError: if a renamer dict contains a non-string and non-callable value

Examples

>>> from graphtik import operation, sfx
>>> op = operation(str, "foo", needs="a",
...     provides=["b", sfx("c")],
...     aliases={"b": "B-aliased"})
>>> op.withset(renamer={"foo": "BAR",
...                     'a': "A",
...                     'b': "B",
...                     sfx('c'): "cc",
...                     "B-aliased": "new.B-aliased"})
FnOp(name='BAR',
                    needs=['A'],
                    provides=['B', sfx('cc')],
                    aliases=[('B', 'new.B-aliased')],
                    fn='str')
  • Notice that 'c' rename change the “sideffect name, without the destination name being an sfx() modifier (but source name must match the sfx-specifier).

  • Notice that the source of aliases from b-->B is handled implicitely from the respective rename on the provides.

But usually a callable is more practical, like the one below renaming only data names:

>>> op.withset(renamer=lambda ren_args:
...            dep_renamed(ren_args.name, lambda n: f"parent.{n}")
...            if ren_args.typ != 'op' else
...            False)
FnOp(name='foo',
                    needs=['parent.a'],
                    provides=['parent.b', sfx('parent.c')],
                    aliases=[('parent.b', 'parent.B-aliased')],
                    fn='str')

Notice the double use of lambdas with dep_renamed() – an equivalent rename callback would be:

dep_renamed(ren_args.name, f"parent.{dependency(ren_args.name)}")
graphtik.fnop.NO_RESULT = <NO_RESULT>

A special return value for the function of a reschedule operation signifying that it did not produce any result at all (including sideffects), otherwise, it would have been a single result, None. Usefull for rescheduled who want to cancel their single result witout being delcared as returns dictionary.

graphtik.fnop.NO_RESULT_BUT_SFX = <NO_RESULT_BUT_SFX>

Like NO_RESULT but does not cancel any :term;`sideffects` declared as provides.

graphtik.fnop._spread_sideffects(deps: Collection[str]) → Tuple[Collection[str], Collection[str]][source]

Build fn/op dependencies from user ones by stripping or singularizing any sideffects.

Returns

the given deps duplicated as (fn_deps,  op_deps), where any instances of sideffects are processed like this:

fn_deps
  • any sfxed() are replaced by the stripped dependency consumed/produced by underlying functions, in the order they are first met (the rest duplicate sideffected are discarded).

  • any sfx() are simply dropped;

op_deps

any sfxed() are replaced by a sequence of “singularized” instances, one for each item in their _Modifier.sfx_list attribute, in the order they are first met (any duplicates are discarded, order is irrelevant, since they don’t reach the function);

graphtik.fnop.as_renames(i, argname)[source]

Parses a list of (source–>destination) from dict, list-of-2-items, single 2-tuple.

Returns

a (possibly empty)list-of-pairs

Note

The same source may be repeatedly renamed to multiple destinations.

graphtik.fnop.identity_fn(*args, **kwargs)[source]

Act as the default function for the conveyor operation when no fn is given.

Adapted from https://stackoverflow.com/a/58524115/548792

graphtik.fnop.jsonp_ize(dep)[source]
graphtik.fnop.jsonp_ize_all(deps)[source]

Auto-convert deps with slashes as jsonp (unless no_jsonp).

graphtik.fnop.operation(fn: Callable = <UNSET>, name=<UNSET>, needs: Optional[Union[Collection, str]] = <UNSET>, provides: Optional[Union[Collection, str]] = <UNSET>, aliases: Mapping = <UNSET>, *, rescheduled=<UNSET>, endured=<UNSET>, parallel=<UNSET>, marshalled=<UNSET>, returns_dict=<UNSET>, node_props: Mapping = <UNSET>)[source]

An operation factory that works like a “fancy decorator”.

Parameters
  • fn

    The callable underlying this operation:

    • if not given, it returns the the withset() method as the decorator, so it still supports all arguments, apart from fn.

    • if given, it builds the operation right away (along with any other arguments);

    • if given, but is None, it will assign the :default identity function right before it is computed.

    Hint

    This is a twisted way for “fancy decorators”.

    After all that, you can always call FnOp.withset() on existing operation, to obtain a re-configured clone.

    If the fn is still not given when calling FnOp.compute(), then default identity function is implied, if name is given and the number of provides match the number of needs.

  • name (str) – The name of the operation in the computation graph. If not given, deduce from any fn given.

  • needs

    the list of (positionally ordered) names of the data needed by the operation to receive as inputs, roughly corresponding to the arguments of the underlying fn (plus any sideffects).

    It can be a single string, in which case a 1-element iterable is assumed.

  • provides

    the list of (positionally ordered) output data this operation provides, which must, roughly, correspond to the returned values of the fn (plus any sideffects & aliases).

    It can be a single string, in which case a 1-element iterable is assumed.

    If they are more than one, the underlying function must return an iterable with same number of elements, unless param returns_dict is true, in which case must return a dictionary that containing (at least) those named elements.

  • aliases – an optional mapping of provides to additional ones

  • rescheduled – If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.

  • endured – If true, even if callable fails, solution will reschedule. ignored if endurance enabled globally.

  • parallel – execute in parallel

  • marshalled – If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).

  • returns_dict – if true, it means the fn returns dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)

  • node_props – Added as-is into NetworkX graph, and you may filter operations by Pipeline.withset(). Also plot-rendering affected if they match Graphviz properties., unless they start with underscore(_)

Returns

when called with fn, it returns a FnOp, otherwise it returns a decorator function that accepts fn as the 1st argument.

Note

Actually the returned decorator is the FnOp.withset() method and accepts all arguments, monkeypatched to support calling a virtual withset() method on it, not to interrupt the builder-pattern, but only that - besides that trick, it is just a bound method.

Example:

If no fn given, it returns the withset method, to act as a decorator:

>>> from graphtik import operation, varargs
>>> op = operation()
>>> op
<function FnOp.withset at ...
But if fn is set to None
>>> op = op(needs=['a', 'b'])
>>> op
FnOp(name=None, needs=['a', 'b'], fn=None)

If you call an operation without fn and no name, it will scream:

>>> op.compute({"a":1, "b": 2})
Traceback (most recent call last):
ValueError: Operation must have a callable `fn` and a non-empty `name`:
    FnOp(name=None, needs=['a', 'b'], fn=None)
  (tip: for defaulting `fn` to conveyor-identity, # of provides must equal needs)

But if you give just a name with None as fn it will build an conveyor operation for some needs & provides:

>>> op = operation(None, name="copy", needs=["foo", "bar"], provides=["FOO", "BAZ"])
>>> op.compute({"foo":1, "bar": 2})
{'FOO': 1, 'BAZ': 2}

You may keep calling withset() on an operation, to build modified clones:

>>> op = op.withset(needs=['a', 'b'],
...                 provides='SUM', fn=lambda a, b: a + b)
>>> op
FnOp(name='copy', needs=['a', 'b'], provides=['SUM'], fn='<lambda>')
>>> op.compute({"a":1, "b": 2})
{'SUM': 3}
>>> op.withset(fn=lambda a, b: a * b).compute({'a': 2, 'b': 5})
{'SUM': 10}
graphtik.fnop.reparse_operation_data(name, needs, provides, aliases=()) → Tuple[Hashable, Collection[str], Collection[str], Collection[Tuple[str, str]]][source]

Validate & reparse operation data as lists.

Returns

name, needs, provides, aliases

As a separate function to be reused by client building operations, to detect errors early.

Module: pipeline

compose pipelines by combining operations into network.

Note

This module (along with op & modifier) is what client code needs to define pipelines on import time without incurring a heavy price (<5ms on a 2019 fast PC)

class graphtik.pipeline.NULL_OP(name)[source]

Eliminates same-named operations added later during term:operation merging.

Seealso

Merging

__abstractmethods__ = frozenset({})[source]
__eq__(o)[source]

Return self==value.

__hash__()[source]

Return hash(self).

__init__(name)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'graphtik.pipeline'
__repr__()[source]

Return repr(self).

_abc_impl = <_abc_data object>
compute(*args, **kw)[source]

Compute (optional) asked outputs for the given named_inputs.

It is called by Network. End-users should simply call the operation with named_inputs as kwargs.

Parameters

named_inputs – the input values with which to feed the computation.

Returns list

Should return a list values representing the results of running the feed-forward computation on inputs.

name[source]
needs[source]
op_needs[source]
op_provides[source]
prepare_plot_args(*args, **kw)[source]

Delegate to a provisional network with a single op .

provides[source]
class graphtik.pipeline.Pipeline(operations, name, *, outputs=None, predicate: NodePredicate = None, rescheduled=None, endured=None, parallel=None, marshalled=None, node_props=None, renamer=None)[source]

An operation that can compute a network-graph of operations.

Tip

  • Use compose() factory to prepare the net and build instances of this class.

  • See diacritics to understand printouts of this class.

__abstractmethods__ = frozenset({})[source]
__call__(**input_kwargs) → Solution[source]

Delegates to compute(), respecting any narrowed outputs.

__init__(operations, name, *, outputs=None, predicate: NodePredicate = None, rescheduled=None, endured=None, parallel=None, marshalled=None, node_props=None, renamer=None)[source]

For arguments, ee withset() & class attributes.

Raises

ValueError

if dupe operation, with msg:

Operations may only be added once, …

__module__ = 'graphtik.pipeline'
__repr__()[source]

Display more informative names for the Operation class

_abc_impl = <_abc_data object>
compile(inputs=None, outputs=<UNSET>, predicate: NodePredicate = <UNSET>) → ExecutionPlan[source]

Produce a plan for the given args or outputs/predicate narrowed earlier.

Parameters
  • named_inputs – a string or a list of strings that should be fed to the needs of all operations.

  • outputs – A string or a list of strings with all data asked to compute. If None, all possible intermediate outputs will be kept. If not given, those set by a previous call to withset() or cstor are used.

  • predicate – Will be stored and applied on the next compute() or compile(). If not given, those set by a previous call to withset() or cstor are used.

Returns

the execution plan satisfying the given inputs, outputs & predicate

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

  • If solution does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

compute(named_inputs: Mapping = <UNSET>, outputs: Optional[Union[Collection, str]] = <UNSET>, predicate: NodePredicate = None, solution_class: Type[Solution] = None, layered_solution=None) → Solution[source]

Compile a plan & execute the graph, sequentially or parallel.

Attention

If intermediate planning is successful, the “global abort run flag is reset before the execution starts.

Parameters
  • named_inputs – A mapping of names –> values that will be fed to the needs of all operations. Cloned, not modified.

  • outputs – A string or a list of strings with all data asked to compute. If None, all intermediate data will be kept.

  • predicate – filter-out nodes before compiling

  • solution_class – a custom solution factory to use

  • layered_solution

    whether to store operation results into separate solution layers

    Unless overridden by a True/False in set_layered_solution() of configurations, it accepts the following values:

    • When True(False), always keep(don’t keep) results in a separate layer for each operation, regardless of any jsonp dependencies.

    • If None, layers are used only if there are NO jsonp dependencies in the network.

Returns

The solution which contains the results of each operation executed +1 for inputs in separate dictionaries.

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

  • If plan does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

See also Operation.compute().

property graph
name: str = None[source]

The name for the new pipeline, used when nesting them.

needs[source]
op_needs[source]
op_provides[source]
outputs = None[source]

The outputs names (possibly None) used to compile the plan.

predicate = None[source]

The node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Delegate to network.

provides[source]
withset(outputs: Optional[Union[Collection, str]] = <UNSET>, predicate: NodePredicate = <UNSET>, *, name=None, rescheduled=None, endured=None, parallel=None, marshalled=None, node_props=None, renamer=None) → Pipeline[source]

Return a copy with a network pruned for the given needs & provides.

Parameters
  • outputs – Will be stored and applied on the next compute() or compile(). If not given, the value of this instance is conveyed to the clone.

  • predicate – Will be stored and applied on the next compute() or compile(). If not given, the value of this instance is conveyed to the clone.

  • name

    the name for the new pipeline:

    • if None, the same name is kept;

    • if True, a distinct name is devised:

      <old-name>-<uid>
      
    • otherwise, the given name is applied.

  • rescheduled – applies rescheduled to all contained operations

  • endured – applies endurance to all contained operations

  • parallel – mark all contained operations to be executed in parallel

  • marshalled – mark all contained operations to be marshalled (usefull when run in parallel with a process pool).

  • renamer – see respective parameter in FnOp.withset().

Returns

A narrowed pipeline clone, which MIGHT be empty!*

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

graphtik.pipeline._id_bool(b)[source]
graphtik.pipeline._id_tristate_bool(b)[source]
graphtik.pipeline.build_network(operations, rescheduled=None, endured=None, parallel=None, marshalled=None, node_props=None, renamer=None)[source]

The network factory that does operation merging before constructing it.

Parameters

nest – see same-named param in compose()

graphtik.pipeline.compose(name, op1, *operations, outputs: Optional[Union[Collection, str]] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, nest: Union[Callable[[graphtik.base.RenArgs], str], Mapping[str, str], bool, str] = None, node_props=None)graphtik.pipeline.Pipeline[source]

Merge or nest operations & pipelines into a new pipeline,

based on the nest parameter (read below)

Operations given earlier (further to the left) override those following (further to the right), similar to set behavior (and contrary to dict).

Parameters
  • name (str) – A optional name for the graph being composed by this object.

  • op1 – syntactically force at least 1 operation

  • operations – each argument should be an operation or pipeline instance

  • nest

    a dictionary or callable corresponding to the renamer paremater of Pipeline.withset(), but the calable receives a ren_args with RenArgs.parent set when merging a pipeline, and applies the default nesting behavior (nest_any_node()) on truthies.

    Specifically:

    • if it is a dictionary, it renames any operations & data named as keys into the respective values, like that:

      • if a value is callable or str, it is fed into dep_renamed() (hint: it can be single-arg callable like: (str) -> str)

      • it applies default all-nodes nesting if other truthy;

      Note that you cannot access the “parent” name with dictionaries, you can only apply default all-node nesting by returning a non-string truthy.

    • if it is a callable(), it is given a RenArgs instance to decide the node’s name.

      The callable may return a str for the new-name, or any other true/false to apply default all-nodes nesting.

      For example, to nest just operation’s names (but not their dependencies), call:

      compose(
          ...,
          nest=lambda ren_args: ren_args.typ == "op"
      )
      

      Attention

      The callable SHOULD wish to preserve any modifier on dependencies, and use dep_renamed() for RenArgs.typ not ending in .jsonpart.

    • If false (default), applies operation merging, not nesting.

    • if true, applies default operation nesting to all types of nodes.

    In all other cases, the names are preserved.

    See also

  • rescheduled – applies rescheduled to all contained operations

  • endured – applies endurance to all contained operations

  • parallel – mark all contained operations to be executed in parallel

  • marshalled – mark all contained operations to be marshalled (usefull when run in parallel with a process pool).

  • node_props – Added as-is into NetworkX graph, to provide for filtering by Pipeline.withset(). Also plot-rendering affected if they match Graphviz properties, unless they start with underscore(_)

Returns

Returns a special type of operation class, which represents an entire computation graph as a single operation.

Raises

ValueError

  • If the net` cannot produce the asked outputs from the given inputs.

  • If nest callable/dictionary produced an non-string or empty name (see (NetworkPipeline))

graphtik.pipeline.nest_any_node(ren_args: graphtik.base.RenArgs)str[source]

Nest both operation & data under parent’s name (if given) but NOT jsonparts.

Returns

the nested name of the operation or data

Module: modifier

modifiers change dependency behavior during planning & execution.

The needs and provides annotated with modifiers designate, for instance, optional function arguments, or “ghost” sideffects.

Note

This module (along with op & pipeline) is what client code needs to define pipelines on import time without incurring a heavy price (~7ms on a 2019 fast PC)

Diacritics

The representation of modifier-annotated dependencies utilize a combination of these diacritics:

>   : keyword()
?   : optional()
*   : vararg()
+   : varargs()
$   : accessor (mostly for jsonp)
class graphtik.modifier.Accessor(contains: Callable[[dict, str], Any], getitem: Callable[[dict, str], Any], setitem: Callable[[dict, str, Any], None], delitem: Callable[[dict, str], None], update: Callable[[dict, str, Any], None] = None)[source]

Getter/setter functions to extract/populate values from a solution layer.

Note

Don’t use its attributes directly, prefer instead the functions returned from acc_contains() etc on any dep (plain strings included).

TODO: drop accessors, push functionality into jsonp alone.

__getnewargs__()[source]

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, contains: Callable[[dict, str], Any], getitem: Callable[[dict, str], Any], setitem: Callable[[dict, str, Any], None], delitem: Callable[[dict, str], None], update: Callable[[dict, str, Any], None] = None)

Create new instance of Accessor(contains, getitem, setitem, delitem, update)

__repr__()[source]

Return a nicely formatted representation string

_asdict()[source]

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)[source]

Make a new Accessor object from a sequence or iterable

_replace(**kwds)[source]

Return a new Accessor object replacing specified fields with new values

property contains

the containment checker, like: dep in sol;

property delitem

the deleter, like: delitem(sol, dep)

property getitem

the getter, like: getitem(sol, dep) -> value

property setitem

the setter, like: setitem(sol, dep, val),

property update

mass updater, like: update(sol, item_values),

validate()[source]

Call me early to fail asap (if it must); returns self instance.

graphtik.modifier.JsonpAccessor()[source]

Get/Set paths found on modifier’s “extra’ attribute jsonpath

class graphtik.modifier._Modifier(name, _repr, _func, keyword, optional: graphtik.modifier._Optionals, accessor, sideffected, sfx_list, **kw)[source]

Annotate a dependency with a combination of modifier.

This class is private, because client code should not need to call its cstor, or check if a dependency isinstance(), but use these facilities instead:

Parameters

kw – any extra attributes not needed by execution machinery such as the jsonpath, which is used only by accessor.

Note

Factory function:func:_modifier() may return a plain string, if no other arg but name is given.

static __new__(cls, name, _repr, _func, keyword, optional: graphtik.modifier._Optionals, accessor, sideffected, sfx_list, **kw)graphtik.modifier._Modifier[source]

Warning, returns None!

__repr__()[source]

Note that modifiers have different repr() from str().

__weakref__

list of weak references to the object (if defined)

_accessor: graphtik.modifier.Accessor = None[source]

An accessor with getter/setter functions to read/write solution values. Any sequence of 2-callables will do.

_func: str[source]

The name of a modifier function here, needed to reconstruct cstor code in cmd

_keyword: str = None[source]

Map my name in needs into this kw-argument of the function. get_keyword() returns it.

_optional: graphtik.modifier._Optionals = None[source]

required is None, regular optional or varargish? is_optional() returns it. All regulars are keyword.

_repr: str[source]

pre-calculated representation

_sfx_list: Tuple[Optional[str]] = ()[source]

At least one name(s) denoting the sideffects modification(s) on the sideffected, performed/required by the operation.

  • If it is an empty tuple`, it is an abstract sideffect,

    and is_pure_optional() returns True.

  • If not empty is_sfxed() returns true (the sideffected).

_sideffected: str = None[source]

Has value only for sideffects: the pure-sideffect string or the existing sideffected dependency.

property cmd

the code to reproduce it

class graphtik.modifier._Optionals(value)[source]

An enumeration.

graphtik.modifier._modifier(name, *, keyword=None, optional: graphtik.modifier._Optionals = None, accessor=None, sideffected=None, sfx_list=(), jsonp=None, **kw) → Union[str, graphtik.modifier._Modifier][source]

A _Modifier factory that may return a plain str when no other args given.

It decides the final name and _repr for the new modifier by matching the given inputs with the _modifier_cstor_matrix.

Parameters
  • jsonp – If given, it may be the pre-splitted parts of the jsonp dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

  • kw – Not used here, any given kKVs are assigned as _Modifier attributes, for client code to extend its own modifiers.

graphtik.modifier._modifier_cstor_matrix = {700000: None, 700010: ("sfx('%(dep)s')", "sfx('%(dep)s')", 'sfx'), 700011: ("sfxed('%(dep)s', %(sfx)s)", "sfxed(%(acs)s'%(dep)s', %(sfx)s)", 'sfxed'), 700100: ('%(dep)s', "'%(dep)s'($)", 'accessor'), 700111: ("sfxed('%(dep)s', %(sfx)s)", "sfxed('%(dep)s'($), %(sfx)s)", 'sfxed'), 701010: ("sfx('%(dep)s')", "sfx('%(dep)s'(?))", 'sfx'), 702000: ('%(dep)s', "'%(dep)s'(*)", 'vararg'), 702011: ("sfxed('%(dep)s', %(sfx)s)", "sfxed(%(acs)s'%(dep)s'(*), %(sfx)s)", 'sfxed_vararg'), 702100: ('%(dep)s', "'%(dep)s'($*)", 'vararg'), 702111: ("sfxed('%(dep)s', %(sfx)s)", "sfxed('%(dep)s'($*), %(sfx)s)", 'sfxed_vararg'), 703000: ('%(dep)s', "'%(dep)s'(+)", 'varargs'), 703011: ("sfxed('%(dep)s', %(sfx)s)", "sfxed(%(acs)s'%(dep)s'(+), %(sfx)s)", 'sfxed_varargs'), 703100: ('%(dep)s', "'%(dep)s'($+)", 'varargs'), 703111: ("sfxed('%(dep)s', %(sfx)s)", "sfxed('%(dep)s'($+), %(sfx)s)", 'sfxed_varargs'), 710000: ('%(dep)s', "'%(dep)s'(%(acs)s>%(kw)s)", 'keyword'), 710011: ("sfxed('%(dep)s', %(sfx)s)", "sfxed(%(acs)s'%(dep)s'(>%(kw)s), %(sfx)s)", 'sfxed'), 710100: ('%(dep)s', "'%(dep)s'($>%(kw)s)", 'keyword'), 710111: ("sfxed('%(dep)s', %(sfx)s)", "sfxed('%(dep)s'($>%(kw)s), %(sfx)s)", 'sfxed'), 711000: ('%(dep)s', "'%(dep)s'(%(acs)s?%(kw)s)", 'optional'), 711011: ("sfxed('%(dep)s', %(sfx)s)", "sfxed(%(acs)s'%(dep)s'(?%(kw)s), %(sfx)s)", 'sfxed'), 711100: ('%(dep)s', "'%(dep)s'($?%(kw)s)", 'optional'), 711111: ("sfxed('%(dep)s', %(sfx)s)", "sfxed('%(dep)s'($?%(kw)s), %(sfx)s)", 'sfxed')}

Arguments-presence patterns for _Modifier constructor. Combinations missing raise errors.

graphtik.modifier.acc_contains(dep) → Callable[[Collection, str], Any][source]

A fn like operator.contains() for any dep (with-or-without accessor)

graphtik.modifier.acc_delitem(dep) → Callable[[Collection, str], None][source]

A fn like operator.delitem() for any dep (with-or-without accessor)

graphtik.modifier.acc_getitem(dep) → Callable[[Collection, str], Any][source]

A fn like operator.getitem() for any dep (with-or-without accessor)

graphtik.modifier.acc_setitem(dep) → Callable[[Collection, str, Any], None][source]

A fn like operator.setitem() for any dep (with-or-without accessor)

graphtik.modifier.accessor(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate a dependency with accessor functions to read/write solution.

Parameters
  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

  • Probably not very usefull – see the jsonp() modifier for an integrated use case.

  • To combine it with optional, keyword, etc use the rest modifier factories and pass an argument value to their accessor parameter.

graphtik.modifier.dep_renamed(dep, ren) → Union[graphtik.modifier._Modifier, str][source]

Renames dep as ren or call ren` (if callable) to decide its name,

preserving any keyword() to old-name.

For sideffected it renames the dependency (not the sfx-list) – you have to do it that manually with a custom renamer-function, if ever the need arise.

graphtik.modifier.dep_singularized(dep) → Iterable[Union[str, graphtik.modifier._Modifier]][source]

Yield one sideffected for each sfx in sfx_list, or iterate dep in other cases.

graphtik.modifier.dep_stripped(dep) → Union[str, graphtik.modifier._Modifier][source]

Return the _Modifier.sideffected if dep is sideffected, dep otherwise,

conveying all other properties of the original modifier to the stripped dependency.

graphtik.modifier.dependency(dep)str[source]

Returns the underlying dependency name (just str)

For non-sideffects, it coincides with str(), otherwise, the the pure-sideffect string or the existing sideffected dependency stored in sideffected.

graphtik.modifier.get_accessor(dep)bool[source]

Check if dependency has an accessor, and get it (if funcs below are unfit)

Returns

the accessor

graphtik.modifier.get_jsonp(dep) → Optional[List[str]][source]

Check if the dependency is json pointer path and return steps.

graphtik.modifier.get_keyword(dep) → Optional[str][source]

Check if a dependency is keyword (and get it).

All non-varargish optionals are “keyword” (including sideffected ones).

Returns

the keyword

graphtik.modifier.is_optional(dep) → Optional[graphtik.modifier._Optionals][source]

Check if a dependency is optional.

Varargish & optional sideffects are included.

Returns

the optional

graphtik.modifier.is_pure_sfx(dep)bool[source]

Check if it is sideffects but not a sideffected.

graphtik.modifier.is_sfx(dep) → Optional[str][source]

Check if a dependency is sideffects or sideffected.

Returns

the sideffected

graphtik.modifier.is_sfxed(dep)bool[source]

Check if it is sideffected.

graphtik.modifier.is_vararg(dep)bool[source]

Check if an optionals dependency is vararg.

graphtik.modifier.is_varargish(dep)bool[source]

Check if an optionals dependency is varargish.

graphtik.modifier.is_varargs(dep)bool[source]

Check if an optionals dependency is varargs.

graphtik.modifier.jsonp(name: str, jsonp=None)graphtik.modifier._Modifier[source]

Control the automatic interpretation of dependencies containing slashes into json pointer path.

Parameters

jsonp

If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

Tip

If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Example:

Let’s use json pointer dependencies along with the default conveyor operation to build an operation copying values around in the solution:

>>> from graphtik import operation, compose, jsonp
>>> copy_values = operation(
...     fn=None,  # ask for the "conveyor op"
...     name="copy a+b-->A+BB",
...     needs=["inputs/a", "inputs/b"],
...     provides=["RESULTS/A", "RESULTS/BB"]
... )
>>> results = copy_values.compute({"inputs": {"a": 1, "b": 2}})
Traceback (most recent call last):
ValueError: Failed preparing needs:
    1. Missing compulsory needs['inputs/a'($), 'inputs/b'($)]!
    +++inputs: ['inputs']
    +++FnOp(name='copy a+b-->A+BB',
                           needs=['inputs/a'($), 'inputs/b'($)],
                           provides=['RESULTS/A'($), 'RESULTS/BB'($)],
                           fn='identity_fn')
>>> results = copy_values.compute({"inputs/a": 1, "inputs/b": 2})
>>> results
{'RESULTS/A'($): 1, 'RESULTS/BB'($): 2}

Notice that the hierarchical dependencies did not yet worked, because jsonp modifiers work internally with accessors, and FnOp is unaware of them – it’s the Solution class that supports accessors*, and this requires the operation to be wrapped in a pipeline (see below).

Note also that it we see the “representation’ of the key as 'RESULTS/A'($) but the actual string value simpler:

>>> str(next(iter(results)))
'RESULTS/A'

The results were not nested, because this modifer works with accessor functions, that act only on a real Solution, given to the operation only when wrapped in a pipeline (as done below).

Now watch how these paths access deep into solution when the same operation is wrapped in a pipeline:

>>> pipe = compose("copy pipe", copy_values)
>>> sol = pipe.compute({"inputs": {"a": 1, "b": 2}}, outputs="RESULTS")
>>> sol
{'RESULTS': {'A': 1, 'BB': 2}}
graphtik.modifier.keyword(name: str, keyword: str = None, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate a needs that (optionally) maps inputs name –> keyword argument name.

The value of a keyword dependency is passed in as keyword argument to the underlying function.

Parameters
  • keyword

    The argument-name corresponding to this named-input. If it is None, assumed the same as name, so as to behave always like kw-type arg, and to preserve its fn-name if ever renamed.

    Note

    This extra mapping argument is needed either for optionals (but not varargish), or for functions with keywords-only arguments (like def func(*, foo, bar): ...), since inputs are normally fed into functions by-position, not by-name.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Returns

a _Modifier instance, even if no keyword is given OR it is the same as name.

Example:

In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of keyword():

>>> from graphtik import operation, compose, keyword
>>> @operation(needs=['a', keyword("name-in-inputs", "b")], provides="sum")
... def myadd(a, *, b):
...    return a + b
>>> myadd
FnOp(name='myadd',
                    needs=['a', 'name-in-inputs'(>'b')],
                    provides=['sum'],
                    fn='myadd')
>>> graph = compose('mygraph', myadd)
>>> graph
Pipeline('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops: myadd)
>>> sol = graph.compute({"a": 5, "name-in-inputs": 4})['sum']
>>> sol
9
graphtik.modifier.modifier_withset(dep, name=Ellipsis, keyword=Ellipsis, optional: graphtik.modifier._Optionals = Ellipsis, accessor=Ellipsis, sideffected=Ellipsis, sfx_list=Ellipsis, **kw) → Union[graphtik.modifier._Modifier, str][source]

Make a new modifier with changes – handle with care.

Returns

Delegates to _modifier(), so returns a plain string if no args left.

graphtik.modifier.optional(name: str, keyword: str = None, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate optionals needs corresponding to defaulted op-function arguments, …

received only if present in the inputs (when operation is invoked).

The value of an optional dependency is passed in as a keyword argument to the underlying function.

Parameters
  • keyword – the name for the function argument it corresponds; if a falsy is given, same as name assumed, to behave always like kw-type arg and to preserve its fn-name if ever renamed.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Example:

>>> from graphtik import operation, compose, optional
>>> @operation(name='myadd',
...            needs=["a", optional("b")],
...            provides="sum")
... def myadd(a, b=0):
...    return a + b

Notice the default value 0 to the b annotated as optional argument:

>>> graph = compose('mygraph', myadd)
>>> graph
Pipeline('mygraph',
                 needs=['a', 'b'(?)],
                 provides=['sum'],
                 x1 ops: myadd)

The graph works both with and without c provided in the inputs:

>>> graph(a=5, b=4)['sum']
9
>>> graph(a=5)
{'a': 5, 'sum': 5}

Like keyword() you may map input-name to a different function-argument:

>>> operation(needs=['a', optional("quasi-real", "b")],
...           provides="sum"
... )(myadd.fn)  # Cannot wrap an operation, its `fn` only.
FnOp(name='myadd',
                    needs=['a', 'quasi-real'(?'b')],
                    provides=['sum'],
                    fn='myadd')
graphtik.modifier.sfx(name, optional: bool = None)graphtik.modifier._Modifier[source]

sideffects denoting modifications beyond the scope of the solution.

Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (planning) but they have a limited interaction with the operation’s underlying function; specifically:

  • input sideffects must exist in the solution as inputs for an operation depending on it to kick-in, when the computation starts - but this is not necessary for intermediate sideffects in the solution during execution;

  • input sideffects are NOT fed into underlying functions;

  • output sideffects are not expected from underlying functions, unless a rescheduled operation with partial outputs designates a sideffected as canceled by returning it with a falsy value (operation must returns dictionary).

Hint

If modifications involve some input/output, prefer the sfxed() modifier.

You may still convey this relationships by including the dependency name in the string - in the end, it’s just a string - but no enforcement of any kind will happen from graphtik, like:

>>> from graphtik import sfx
>>> sfx("price[sales_df]")
sfx('price[sales_df]')

Example:

A typical use-case is to signify changes in some “global” context, outside solution:

>>> from graphtik import operation, compose, sfx
>>> @operation(provides=sfx("lights off"))  # sideffect names can be anything
... def close_the_lights():
...    pass
>>> graph = compose('strip ease',
...     close_the_lights,
...     operation(
...         name='undress',
...         needs=[sfx("lights off")],
...         provides="body")(lambda: "TaDa!")
... )
>>> graph
Pipeline('strip ease',
                 needs=[sfx('lights off')],
                 provides=[sfx('lights off'), 'body'],
                 x2 ops: close_the_lights, undress)
>>> sol = graph()
>>> sol
{'body': 'TaDa!'}

sideffect

Note

Something has to provide a sideffect for a function needing it to execute - this could be another operation, like above, or the user-inputs; just specify some truthy value for the sideffect:

>>> sol = graph.compute({sfx("lights off"): True})
graphtik.modifier.sfxed(dependency: str, sfx0: str, *sfx_list: str, keyword: str = None, optional: bool = None, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotates a sideffected dependency in the solution sustaining side-effects.

Parameters
  • keyword – the name for the function argument it corresponds. When optional, it becomes the same as name if falsy, so as to behave always like kw-type arg, and to preserve fn-name if ever renamed. When not optional, if not given, it’s all fine.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Like sfx() but annotating a real dependency in the solution, allowing that dependency to be present both in needs and provides of the same function.

Example:

A typical use-case is to signify columns required to produce new ones in pandas dataframes (emulated with dictionaries):

>>> from graphtik import operation, compose, sfxed
>>> @operation(needs="order_items",
...            provides=sfxed("ORDER", "Items", "Prices"))
... def new_order(items: list) -> "pd.DataFrame":
...     order = {"items": items}
...     # Pretend we get the prices from sales.
...     order['prices'] = list(range(1, len(order['items']) + 1))
...     return order
>>> @operation(
...     needs=[sfxed("ORDER", "Items"), "vat rate"],
...     provides=sfxed("ORDER", "VAT")
... )
... def fill_in_vat(order: "pd.DataFrame", vat: float):
...     order['VAT'] = [i * vat for i in order['prices']]
...     return order
>>> @operation(
...     needs=[sfxed("ORDER", "Prices", "VAT")],
...     provides=sfxed("ORDER", "Totals")
... )
... def finalize_prices(order: "pd.DataFrame"):
...     order['totals'] = [p + v for p, v in zip(order['prices'], order['VAT'])]
...     return order

To view all internal dependencies, enable DEBUG in configurations:

>>> from graphtik.config import debug_enabled
>>> with debug_enabled(True):
...     finalize_prices
FnOp(name='finalize_prices',
                    needs=[sfxed('ORDER', 'Prices'), sfxed('ORDER', 'VAT')],
                    op_needs=[sfxed('ORDER', 'Prices'), sfxed('ORDER', 'VAT')],
                    _fn_needs=['ORDER'],
                    provides=[sfxed('ORDER', 'Totals')],
                    op_provides=[sfxed('ORDER', 'Totals')],
                    _fn_provides=['ORDER'],
                    fn='finalize_prices')

Notice that declaring a single sideffected with many items in sfx_list, expands into multiple “singular” sideffected dependencies in the network (check needs & op_needs above).

>>> proc_order = compose('process order', new_order, fill_in_vat, finalize_prices)
>>> sol = proc_order.compute({
...      "order_items": ["toilet-paper", "soap"],
...      "vat rate": 0.18,
... })
>>> sol
{'order_items': ['toilet-paper', 'soap'],
 'vat rate': 0.18,
 'ORDER': {'items': ['toilet-paper', 'soap'],
           'prices': [1, 2],
           'VAT': [0.18, 0.36],
           'totals': [1.18, 2.36]}}

sideffecteds

Notice that although many functions consume & produce the same ORDER dependency (check fn_needs & fn_provides, above), something that would have formed cycles, the wrapping operations need and provide different sideffected instances, breaking the cycles.

See also

The elaborate example in Hierarchical data and further tricks section.

graphtik.modifier.sfxed_vararg(dependency: str, sfx0: str, *sfx_list: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Like sideffected() + vararg().

graphtik.modifier.sfxed_varargs(dependency: str, sfx0: str, *sfx_list: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Like sideffected() + varargs().

graphtik.modifier.vararg(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate a varargish needs to be fed as function’s *args.

Parameters
  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

We designate b & c as vararg arguments:

>>> from graphtik import operation, compose, vararg
>>> @operation(
...     needs=['a', vararg('b'), vararg('c')],
...     provides='sum'
... )
... def addall(a, *b):
...    return a + sum(b)
>>> addall
FnOp(name='addall',
                    needs=['a', 'b'(*), 'c'(*)],
                    provides=['sum'],
                    fn='addall')
>>> graph = compose('mygraph', addall)

The graph works with and without any of b or c inputs:

>>> graph(a=5, b=2, c=4)['sum']
11
>>> graph(a=5, b=2)
{'a': 5, 'b': 2, 'sum': 7}
>>> graph(a=5)
{'a': 5, 'sum': 5}
graphtik.modifier.varargs(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

An varargish vararg(), naming a iterable value in the inputs.

Parameters
  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

>>> from graphtik import operation, compose, varargs
>>> def enlist(a, *b):
...    return [a] + list(b)
>>> graph = compose('mygraph',
...     operation(name='enlist', needs=['a', varargs('b')],
...     provides='sum')(enlist)
... )
>>> graph
Pipeline('mygraph',
                 needs=['a', 'b'(?)],
                 provides=['sum'],
                 x1 ops: enlist)

The graph works with or without b in the inputs:

>>> graph(a=5, b=[2, 20])['sum']
[5, 2, 20]
>>> graph(a=5)
{'a': 5, 'sum': [5]}
>>> graph(a=5, b=0xBAD)
Traceback (most recent call last):
...
ValueError: Failed preparing needs:
    1. Expected needs['b'(+)] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FnOp(name='enlist', needs=['a', 'b'(+)], provides=['sum'], fn='enlist')

Attention

To avoid user mistakes, varargs do not accept str inputs (though iterables):

>>> graph(a=5, b="mistake")
Traceback (most recent call last):
...
ValueError: Failed preparing needs:
    1. Expected needs['b'(+)] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FnOp(name='enlist',
                           needs=['a', 'b'(+)],
                           provides=['sum'],
                           fn='enlist')

See also

The elaborate example in Hierarchical data and further tricks section.

Module: planning

compose network of operations & dependencies, compile the plan.

class graphtik.planning.Network(*operations, graph=None)[source]

A graph of operations that can compile an execution plan.

needs[source]

the “base”, all data-nodes that are not produced by some operation

provides[source]

the “base”, all data-nodes produced by some operation

__abstractmethods__ = frozenset({})[source]
__init__(*operations, graph=None)[source]
Parameters
  • operations – to be added in the graph

  • graph – if None, create a new.

Raises

ValueError

if dupe operation, with msg:

Operations may only be added once, …

__module__ = 'graphtik.planning'
__repr__()[source]

Return repr(self).

_abc_impl = <_abc_data object>
_append_operation(graph, operation: graphtik.base.Operation)[source]

Adds the given operation and its data requirements to the network graph.

  • Invoked during constructor only (immutability).

  • Identities are based on the name of the operation, the names of the operation’s needs, and the names of the data it provides.

  • Adds needs, operation & provides, in that order.

Parameters
  • graph – the networkx graph to append to

  • operation – operation instance to append

_apply_graph_predicate(graph, predicate)[source]
_build_execution_steps(pruned_dag, inputs: Collection, outputs: Collection) → List[source]

Create the list of operations and eviction steps, to execute given IOs.

Parameters
  • pruned_dag – The original dag, pruned; not broken.

  • inputs – Not used(!), useless inputs will be evicted when the solution is created.

  • outputs – outp-names to decide whether to add (and which) evict-instructions

Returns

the list of operation or dependencies to evict, in computation order

IMPLEMENTATION:

The operation steps are based on the topological sort of the DAG, therefore pruning must have eliminated any cycles.

Then the eviction steps are introduced between the operation nodes (if enabled, and outputs have been asked, or else all outputs are kept), to reduce asap solution’s memory footprint while the computation is running.

  • An evict-instruction is inserted on 2 occasions:

    1. whenever a need of a an executed op is not used by any other operation further down the DAG.

    2. whenever a provide falls beyond the pruned_dag.

  • For doc chains, it is either evicted the whole chain (from root), or nothing at all.

  • For eviction purposes, sfxed dependencies are equivalent to their stripped sideffected ones, so these are also inserted in the graph (after sorting, to evade cycles).

_cached_plans[source]

Speed up compile() call and avoid a multithreading issue(?) that is occurring when accessing the dag in networkx.

_prune_graph(inputs: Optional[Union[Collection, str]], outputs: Optional[Union[Collection, str]], predicate: Callable[[Any, Mapping], bool] = None) → Tuple[networkx.classes.digraph.DiGraph, Collection, Collection, Collection][source]

Determines what graph steps need to run to get to the requested outputs from the provided inputs: - Eliminate steps that are not on a path arriving to requested outputs; - Eliminate unsatisfied operations: partial inputs or no outputs needed; - consolidate the list of needs & provides.

Parameters
  • inputs – The names of all given inputs.

  • outputs – The desired output names. This can also be None, in which case the necessary steps are all graph nodes that are reachable from the provided inputs.

  • predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.

Returns

a 3-tuple with the pruned_dag & the needs/provides resolved based on the given inputs/outputs (which might be a subset of all needs/outputs of the returned graph).

Use the returned needs/provides to build a new plan.

Raises

ValueError

  • if outputs asked do not exist in network, with msg:

    Unknown output nodes: …

_topo_sort_nodes(dag) → List[source]

Topo-sort dag by execution order, then by operation-insertion order to break ties.

This means (probably!?) that the first inserted win the needs, but the last one win the provides (and the final solution).

compile(inputs: Optional[Union[Collection, str]] = None, outputs: Optional[Union[Collection, str]] = None, predicate=None) → ExecutionPlan[source]

Create or get from cache an execution-plan for the given inputs/outputs.

See _prune_graph() and _build_execution_steps() for detailed description.

Parameters
  • inputs – A collection with the names of all the given inputs. If None`, all inputs that lead to given outputs are assumed. If string, it is converted to a single-element collection.

  • outputs – A collection or the name of the output name(s). If None`, all reachable nodes from the given inputs are assumed. If string, it is converted to a single-element collection.

  • predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include; if None, all nodes included.

Returns

the cached or fresh new execution plan

Raises

ValueError

  • If outputs asked do not exist in network, with msg:

    Unknown output nodes: …

  • If solution does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

graph[source]

The networkx (Di)Graph containing all operations and dependencies, prior to planning.

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Called by plot() to create the nx-graph and other plot-args, e.g. solution.

  • Clone the graph or merge it with the one in the plot_args (see PlotArgs.clone_or_merge_graph().

  • For the rest args, prefer PlotArgs.with_defaults() over _replace(), not to override user args.

graphtik.planning._optionalized(graph, data)[source]

Retain optionality of a data node based on all needs edges.

graphtik.planning._yield_also_chained_docs(dig_dag: List[Tuple[str, int]], dag, doc: str, stop_set=()) → Iterable[str][source]

Dig the doc and its sub/super docs, not recursing in those already in stop_set.

Parameters
  • dig_dag – a sequence of 2-tuples like ("in_edges", 0), with the name of a networkx method and which edge-node to pick, 0:= src, 1:= dst

  • stop_set – Stop traversing (and don’t return) doc if already contained in this set.

Returns

the given doc, and any other docs discovered with dig_dag linked with a “subdoc” attribute on their edge, except those sub-trees with a root node already in stop_set. If doc is not in dag, returns empty.

graphtik.planning._yield_chained_docs(dig_dag: Union[Tuple[str, int], List[Tuple[str, int]]], dag, docs: Iterable[str], stop_set=()) → Iterable[str][source]

Like _yield_also_chained_docs() but digging for many docs at once.

Returns

the given docs, and any other nodes discovered with dig_dag linked with a “subdoc” attribute on their edge, except those sub-trees with a root node already in stop_set.

graphtik.planning.clone_graph_with_stripped_sfxed(graph)[source]

Clone graph including ALSO stripped sideffected deps, with original attrs.

graphtik.planning.collect_requirements(graph) → Tuple[boltons.setutils.IndexedSet, boltons.setutils.IndexedSet][source]

Collect & split datanodes in (possibly overlapping) needs/provides.

graphtik.planning.log = <Logger graphtik.planning (WARNING)>

If this logger is eventually DEBUG-enabled, the string-representation of network-objects (network, plan, solution) is augmented with children’s details.

graphtik.planning.root_doc(dag, doc: str)str[source]

Return the most superdoc, or the same doc is not in a chin, or raise if node unknown.

graphtik.planning.unsatisfied_operations(dag, inputs: Iterable) → List[source]

Traverse topologically sorted dag to collect un-satisfied operations.

Unsatisfied operations are those suffering from ANY of the following:

  • They are missing at least one compulsory need-input.

    Since the dag is ordered, as soon as we’re on an operation, all its needs have been accounted, so we can get its satisfaction.

  • Their provided outputs are not linked to any data in the dag.

    An operation might not have any output link when _prune_graph() has broken them, due to given intermediate inputs.

Parameters
  • dag – a graph with broken edges those arriving to existing inputs

  • inputs – an iterable of the names of the input values

Returns

a list of unsatisfied operations to prune

graphtik.planning.yield_also_chaindocs(dag, doc: str, stop_set=()) → Iterable[str][source]

Calls _yield_also_chained_docs() for both subdocs & superdocs.

graphtik.planning.yield_also_subdocs(dag, doc: str, stop_set=()) → Iterable[str][source]

Calls _yield_also_chained_docs() for subdocs.

graphtik.planning.yield_also_superdocs(dag, doc: str, stop_set=()) → Iterable[str][source]

Calls _yield_also_chained_docs() for superdocs.

graphtik.planning.yield_chaindocs(dag, docs: Iterable[str], stop_set=()) → Iterable[str][source]

Calls _yield_chained_docs() for both subdocs & superdocs.

graphtik.planning.yield_datanodes(nodes) → List[str][source]

May scan dag nodes.

graphtik.planning.yield_node_names(nodes)[source]

Yield either op.name or str(node).

graphtik.planning.yield_ops(nodes) → List[graphtik.base.Operation][source]

May scan (preferably) plan.steps or dag nodes.

graphtik.planning.yield_subdocs(dag, docs: Iterable[str], stop_set=()) → Iterable[str][source]

Calls _yield_chained_docs() for subdocs.

graphtik.planning.yield_superdocs(dag, docs: Iterable[str], stop_set=()) → Iterable[str][source]

Calls _yield_chained_docs() for superdocs.

Module: execution

execute the plan to derrive the solution.

class graphtik.execution.ExecutionPlan(net, needs, provides, dag, steps, asked_outs)[source]

A pre-compiled list of operation steps that can execute for the given inputs/outputs.

It is the result of the network’s planning phase.

Note the execution plan’s attributes are on purpose immutable tuples.

net

The parent Network

needs

An IndexedSet with the input names needed to exist in order to produce all provides.

provides

An IndexedSet with the outputs names produces when all inputs are given.

dag

The regular (not broken) pruned subgraph of net-graph.

steps

The tuple of operation-nodes & instructions needed to evaluate the given inputs & asked outputs, free memory and avoid overwriting any given intermediate inputs.

asked_outs

When true, evictions may kick in (unless disabled by configurations), otherwise, evictions (along with prefect-evictions check) are skipped.

__abstractmethods__ = frozenset({})[source]
__dict__ = mappingproxy({'__module__': 'graphtik.execution', '__doc__': "\n A pre-compiled list of operation steps that can :term:`execute` for the given inputs/outputs.\n\n It is the result of the network's :term:`planning` phase.\n\n Note the execution plan's attributes are on purpose immutable tuples.\n\n .. attribute:: net\n\n The parent :class:`Network`\n .. attribute:: needs\n\n An :class:`.IndexedSet` with the input names needed to exist in order to produce all `provides`.\n .. attribute:: provides\n\n An :class:`.IndexedSet` with the outputs names produces when all `inputs` are given.\n .. attribute:: dag\n\n The regular (not broken) *pruned* subgraph of net-graph.\n .. attribute:: steps\n\n The tuple of operation-nodes & *instructions* needed to evaluate\n the given inputs & asked outputs, free memory and avoid overwriting\n any given intermediate inputs.\n .. attribute:: asked_outs\n\n When true, :term:`eviction`\\s may kick in (unless disabled by :term:`configurations`),\n otherwise, *evictions* (along with prefect-evictions check) are skipped.\n ", 'graph': <property object>, 'prepare_plot_args': <function ExecutionPlan.prepare_plot_args>, '__repr__': <function ExecutionPlan.__repr__>, 'validate': <function ExecutionPlan.validate>, '_check_if_aborted': <function ExecutionPlan._check_if_aborted>, '_prepare_tasks': <function ExecutionPlan._prepare_tasks>, '_handle_task': <function ExecutionPlan._handle_task>, '_execute_thread_pool_barrier_method': <function ExecutionPlan._execute_thread_pool_barrier_method>, '_execute_sequential_method': <function ExecutionPlan._execute_sequential_method>, 'execute': <function ExecutionPlan.execute>, '__dict__': <attribute '__dict__' of 'ExecutionPlan' objects>, '__abstractmethods__': frozenset(), '_abc_impl': <_abc_data object>})
__module__ = 'graphtik.execution'
__repr__()[source]

Return a nicely formatted representation string

_abc_impl = <_abc_data object>
_check_if_aborted(solution)[source]
_execute_sequential_method(solution: graphtik.execution.Solution)[source]

This method runs the graph one operation at a time in a single thread

Parameters

solution – must contain the input values only, gets modified

_execute_thread_pool_barrier_method(solution: graphtik.execution.Solution)[source]

This method runs the graph using a parallel pool of thread executors. You may achieve lower total latency if your graph is sufficiently sub divided into operations using this method.

Parameters

solution – must contain the input values only, gets modified

_handle_task(future, op, solution)None[source]

Un-dill parallel task results (if marshalled), and update solution / handle failure.

_prepare_tasks(operations, solution, pool, global_parallel, global_marshal) → Union[Future, graphtik.execution._OpTask, bytes][source]

Combine ops+inputs, apply marshalling, and submit to execution pool (or not) …

based on global/pre-op configs.

execute(named_inputs, outputs=None, *, name='', solution_class=None, layered_solution=None)graphtik.execution.Solution[source]
Parameters
  • named_inputs – A mapping of names –> values that must contain at least the compulsory inputs that were specified when the plan was built (but cannot enforce that!). Cloned, not modified.

  • outputs – If not None, they are just checked if possible, based on provides, and scream if not.

  • name – name of the pipeline used for logging

  • solution_class – a custom solution factory to use

  • layered_solution

    whether to store operation results into separate solution layer

    Unless overridden by a True/False in set_layered_solution() of configurations, it accepts the following values:

    • When True(False), always keep(don’t keep) results in a separate layer for each operation, regardless of any jsonp dependencies.

    • If None, layers are used only if there are NO jsonp dependencies in the network.

Returns

The solution which contains the results of each operation executed +1 for inputs in separate dictionaries.

Raises

ValueError

  • If plan does not contain any operations, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

property graph
prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Called by plot() to create the nx-graph and other plot-args, e.g. solution.

  • Clone the graph or merge it with the one in the plot_args (see PlotArgs.clone_or_merge_graph().

  • For the rest args, prefer PlotArgs.with_defaults() over _replace(), not to override user args.

validate(inputs: Optional[Union[Collection, str]], outputs: Optional[Union[Collection, str]])[source]

Scream on invalid inputs, outputs or no operations in graph.

Raises

ValueError

  • If cannot produce any outputs from the given inputs, with msg:

    Unsolvable graph: …

  • If given inputs mismatched plan’s needs, with msg:

    Plan needs more inputs…

  • If net cannot produce asked outputs, with msg:

    Unreachable outputs…

class graphtik.execution.Solution(plan, input_values: dict, layered_solution=None)[source]

The solution chain-map and execution state (e.g. overwrite or canceled operation)

It inherits collections.ChainMap, to keep a separate dictionary for each operation executed, +1 for the user inputs.

__abstractmethods__ = frozenset({})[source]
__contains__(key)[source]
__copy__()[source]

New ChainMap or subclass with a new copy of maps[0] and refs to maps[1:]

__delitem__(key)[source]
__getitem__(key)[source]
__init__(plan, input_values: dict, layered_solution=None)[source]

Initialize a ChainMap by setting maps to the given mappings. If no mappings are provided, a single empty dictionary is used.

__module__ = 'graphtik.execution'
__repr__()[source]

Return repr(self).

_abc_impl = <_abc_data object>
_layers: Optional[Mapping][source]

layer map of operations to their results (initially empty dicts). The result dictionaries pre-populate this (self) chainmap, with the 1st map (wins all reads) the last operation executed, the last one the input_values dict.

If network contains jsonp dependencies, by default layers are disabled, and this is None.

_reschedule(dag, reason, op)[source]

Re-prune dag, and then update and return any newly-canceled ops.

Parameters
_update_op_outs(op, outputs: Mapping)[source]

Mass update values on the solution layer for the given op.

Observes any accessors in the k A separate method to allow subclasses with custom accessor logic.

canceled[source]

A sorted set of canceled operation\s due to upstream failures.

check_if_incomplete() → Optional[graphtik.base.IncompleteExecutionError][source]

Return a IncompleteExecutionError if pipeline operations failed/canceled.

dag[source]

Cloned from plan will be modified, by removing the downstream edges of:

  • any partial outputs not provided, or

  • all provides of failed operations.

FIXME: SPURIOUS dag reversals on multi-threaded runs (see below next assertion)!

debugstr()[source]
executed[source]

A dictionary with keys the operations executed, and values their status:

  • no key: not executed yet

  • value None: execution ok

  • value Exception: execution failed

  • value Collection: canceled provides

property graph
is_failed(op)[source]
operation_executed(op, outputs)[source]

Invoked once per operation, with its results.

It will update executed with the operation status and if outputs were partials, it will update canceled with the unsatisfied ops downstream of op.

Parameters
  • op – the operation that completed ok

  • outputs – The named values the op` actually produced, which may be a subset of its provides. Sideffects are not considered.

operation_failed(op, ex)[source]

Invoked once per operation, with its results.

It will update executed with the operation status and the canceled with the unsatisfied ops downstream of op.

property overwrites

The data in the solution that exist more than once.

A “virtual” property to a dictionary with keys the names of values that exist more than once, and values, all those values in a list, ordered in reverse compute order (1st is the last one computed).

plan[source]

the plan that produced this solution

prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

delegate to plan, with solution

scream_if_incomplete()[source]

Raise a IncompleteExecutionError if pipeline operations failed/canceled.

solid[source]

A unique identifier to distinguish separate flows in execution logs.

class graphtik.execution._OpTask(op, sol, solid, result=<UNSET>)[source]

Mimic concurrent.futures.Future for sequential execution.

This intermediate class is needed to solve pickling issue with process executor.

__call__()[source]

Call self as a function.

__init__(op, sol, solid, result=<UNSET>)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'graphtik.execution'
__repr__()[source]

Return repr(self).

__slots__ = ('op', 'sol', 'solid', 'result')
get()[source]

Call self as a function.

logname = 'graphtik.execution'
marshalled()[source]
op
result
sol
solid
graphtik.execution._do_task(task)[source]

Un-dill the simpler _OpTask & Dill the results, to pass through pool-processes.

See https://stackoverflow.com/a/24673524/548792

graphtik.execution._isDebugLogging()[source]
graphtik.execution.log = <Logger graphtik.execution (WARNING)>

If this logger is eventually DEBUG-enabled, the string-representation of network-objects (network, plan, solution) is augmented with children’s details.

graphtik.execution.task_context: None = <ContextVar name='task_context'>

Populated with the _OpTask for the currently executing operation. It does not work for parallel execution.

See also

The elaborate example in Hierarchical data and further tricks section

Module: plot

plotting handled by the active plotter & current theme.

class graphtik.plot.Plotter(theme: graphtik.plot.Theme = None, **styles_kw)[source]

a plotter renders diagram images of plottables.

default_theme[source]

The customizable Theme instance controlling theme values & dictionaries for plots.

build_pydot(plot_args: graphtik.base.PlotArgs) → pydot.Dot[source]

Build a pydot.Dot out of a Network graph/steps/inputs/outputs and return it

to be fed into Graphviz to render.

See Plottable.plot() for the arguments, sample code, and the legend of the plots.

legend(filename=None, jupyter_render: Mapping = None, theme: graphtik.plot.Theme = None)[source]

Generate a legend for all plots (see Plottable.plot() for args)

See Plotter.render_pydot() for the rest arguments.

plot(plot_args: graphtik.base.PlotArgs)[source]
render_pydot(dot: pydot.Dot, filename=None, jupyter_render: str = None)[source]

Render a pydot.Dot instance with Graphviz in a file and/or in a matplotlib window.

Parameters
  • dot – the pre-built pydot.Dot instance

  • filename (str) –

    Write a file or open a matplotlib window.

    • If it is a string or file, the diagram is written into the file-path

      Common extensions are .png .dot .jpg .jpeg .pdf .svg call plot.supported_plot_formats() for more.

    • If it IS True, opens the diagram in a matplotlib window (requires matplotlib package to be installed).

    • If it equals -1, it mat-plots but does not open the window.

    • Otherwise, just return the pydot.Dot instance.

    seealso

    PlotArgs.filename

  • jupyter_render

    a nested dictionary controlling the rendering of graph-plots in Jupyter cells. If None, defaults to default_jupyter_render; you may modify those in place and they will apply for all future calls (see Jupyter notebooks).

    You may increase the height of the SVG cell output with something like this:

    plottable.plot(jupyter_render={"svg_element_styles": "height: 600px; width: 100%"})
    

Returns

the matplotlib image if filename=-1, or the given dot annotated with any jupyter-rendering configurations given in jupyter_render parameter.

See Plottable.plot() for sample code.

with_styles(**kw)graphtik.plot.Plotter[source]

Returns a cloned plotter with a deep-copied theme modified as given.

See also Theme.withset().

class graphtik.plot.Ref(ref, default=Ellipsis)[source]

Deferred attribute reference resolve()d on a some object(s).

default
ref
resolve(*objects, default=Ellipsis)[source]

Makes re-based clone to the given class/object.

class graphtik.plot.StylesStack(plot_args: graphtik.base.PlotArgs, named_styles: List[Tuple[str, dict]], ignore_errors: bool = False)[source]

A mergeable stack of dicts preserving provenance and style expansion.

The merge() method joins the collected stack of styles into a single dictionary, and if DEBUG (see remerge()) insert their provenance in a 'tooltip' attribute; Any lists are merged (important for multi-valued Graphviz attributes like style).

Then they are expanded.

add(name, kw=Ellipsis)[source]

Adds a style by name from style-attributes, or provenanced explicitly, or fail early.

Parameters
  • name – Either the provenance name when the kw styles is given, OR just an existing attribute of style instance.

  • kw – if given and is None/empty, ignored.

expand(style: dict)dict[source]

Apply style expansions on an already merged style.

  • Resolve any Ref instances, first against the current nx_attrs and then against the attributes of the current theme.

  • Render jinja2 templates (see _expand_styles()) with template-arguments all the attributes of the plot_args instance in use (hence much more flexible than Ref).

  • Call any callables with current plot_args and replace them by their result (even more flexible than templates).

  • Any Nones results above are discarded.

  • Workaround pydot/pydot#228 pydot-cstor not supporting styles-as-lists.

property ignore_errors

When true, keep merging despite expansion errors.

merge(debug=None)dict[source]

Recursively merge named_styles and expand() the result style.

Parameters

debug – When not None, override config.is_debug() flag. When debug is enabled, tooltips are overridden with provenance & nx_attrs.

Returns

the merged styles

property named_styles

A list of 2-tuples: (name, dict) containing the actual styles along with their provenance.

property plot_args

current item’s plot data with at least PlotArgs.theme attribute. ` `

stack_user_style(nx_attrs: dict, skip=())[source]

Appends keys in nx_attrs starting with USER_STYLE_PREFFIX into the stack.

class graphtik.plot.Theme(*, _prototype: Optional[graphtik.plot.Theme] = None, **kw)[source]

The poor man’s css-like plot theme (see also StyleStack).

To use the values contained in theme-instances, stack them in a StylesStack, and StylesStack.merge() them with style expansions.

Attention

It is recommended to use other means for Plot customizations instead of modifying directly theme’s class-attributes.

All Theme class-attributes are deep-copied when constructing new instances, to avoid modifications by mistake, while attempting to update instance-attributes instead (hint: allmost all its attributes are containers i.e. dicts). Therefore any class-attributes modification will be ignored, until a new Theme instance from the patched class is used .

arch_url = 'https://graphtik.readthedocs.io/en/latest/arch.html'

the url to the architecture section explaining graphtik glossary, linked by legend.

broken_color = 'Red'
canceled_color = '#a9a9a9'
evicted = '#006666'
failed_color = 'LightCoral'
fill_color = 'wheat'
kw_data = {'label': <Template memory:7fbda57f2828>, 'margin': '0.04,0.02', 'shape': 'rect'}

Reduce margins, since sideffects take a lot of space (default margin: x=0.11, y=0.055O)

kw_data_canceled = {'fillcolor': Ref('canceled_color'), 'style': ['filled'], 'tooltip': '(canceled)'}
kw_data_evicted = {'penwidth': '3', 'tooltip': '(evicted)'}
kw_data_in_solution = {'fillcolor': Ref('fill_color'), 'style': ['filled'], 'tooltip': <function make_data_value_tooltip>}
kw_data_inp = {}[source]
kw_data_inp_only = {'shape': 'invhouse'}
kw_data_io = {'shape': 'hexagon'}
kw_data_out = {}[source]
kw_data_out_only = {'shape': 'house'}
kw_data_overwritten = {'fillcolor': Ref('overwrite_color'), 'style': ['filled']}
kw_data_pruned = {'color': Ref('pruned_color'), 'fontcolor': Ref('pruned_color'), 'tooltip': '(pruned)'}
kw_data_sideffect = {'color': 'blue', 'fontcolor': 'blue'}
kw_data_sideffected = {'label': <Template memory:7fbda5815908>}
kw_data_to_evict = {'color': Ref('evicted'), 'fontcolor': Ref('evicted'), 'style': ['dashed'], 'tooltip': '(to evict)'}
kw_edge = {'headport': 'n', 'tailport': 's'}
kw_edge_alias = {'fontsize': 11, 'label': <Template memory:7fbda64c7a20>}

Added conditionally if alias_of found in edge-attrs.

kw_edge_broken = {'color': Ref('broken_color')}
kw_edge_endured = {'style': ['dashed']}
kw_edge_head_op = {'arrowtail': 'inv', 'dir': 'back'}
kw_edge_mapping_keyword = {'fontname': 'italic', 'fontsize': 11, 'label': <Template memory:7fbda518c3c8>}

Rendered if keyword exists in nx_attrs.

kw_edge_optional = {'style': ['dashed']}
kw_edge_pruned = {'color': Ref('pruned_color')}
kw_edge_rescheduled = {'style': ['dashed']}
kw_edge_sideffect = {'color': 'blue'}
kw_edge_subdoc = {'arrowtail': 'odot', 'color': Ref('subdoc_color'), 'dir': 'back', 'headport': 'nw', 'tailport': 'se'}
kw_edge_tail_op = {}[source]
kw_graph = {'fontname': 'italic', 'graph_type': 'digraph'}
kw_graph_plottable_type = {'ExecutionPlan': {}, 'FnOp': {}, 'Network': {}, 'Pipeline': {}, 'Solution': {}}

styles per plot-type

kw_graph_plottable_type_unknown = {}[source]

For when type-name of PlotArgs.plottable is not found in kw_plottable_type ( ot missing altogether).

kw_legend = {'URL': 'https://graphtik.readthedocs.io/en/latest/_images/GraphtikLegend.svg', 'fillcolor': 'yellow', 'name': 'legend', 'shape': 'component', 'style': 'filled', 'target': '_blank'}

If 'URL'` key missing/empty, no legend icon included in plots.

kw_op = {'name': <function Theme.<lambda>>, 'shape': 'plain', 'tooltip': <function Theme.<lambda>>}

props for operation node (outside of label))

kw_op_canceled = {'fillcolor': Ref('canceled_color'), 'tooltip': '(canceled)'}
kw_op_endured = {'badges': ['!'], 'penwidth': Ref('resched_thickness'), 'style': ['dashed'], 'tooltip': '(endured)'}
kw_op_executed = {'fillcolor': Ref('fill_color')}
kw_op_failed = {'fillcolor': Ref('failed_color'), 'tooltip': <Template memory:7fbdabb7cb70>}
kw_op_label = {'fn_link_target': '_top', 'fn_name': <function Theme.<lambda>>, 'fn_tooltip': <function make_fn_tooltip>, 'fn_truncate': Ref('truncate_args'), 'fn_url': Ref('fn_url'), 'op_link_target': '_top', 'op_name': <function Theme.<lambda>>, 'op_tooltip': <function make_op_tooltip>, 'op_truncate': Ref('truncate_args'), 'op_url': Ref('op_url')}

props of the HTML-Table label for Operations

kw_op_marshalled = {'badges': ['&']}
kw_op_parallel = {'badges': ['|']}
kw_op_pruned = {'color': Ref('pruned_color'), 'fontcolor': Ref('pruned_color')}
kw_op_rescheduled = {'badges': ['?'], 'penwidth': Ref('resched_thickness'), 'style': ['dashed'], 'tooltip': '(rescheduled)'}
kw_op_returns_dict = {'badges': ['}']}
kw_step = {'arrowhead': 'vee', 'color': Ref('steps_color'), 'fontcolor': Ref('steps_color'), 'fontname': 'bold', 'fontsize': 18, 'splines': True, 'style': 'dotted'}
op_bad_html_label_keys = {'label', 'shape', 'style'}

Keys to ignore from operation styles & node-attrs, because they are handled internally by HTML-Label, and/or interact badly with that label.

op_badge_styles = {'badge_styles': {'!': {'URL': 'https://graphtik.readthedocs.io/en/latest/arch.html#term-endured', 'bgcolor': '#04277d', 'color': 'white', 'tooltip': 'endured'}, '&': {'URL': 'https://graphtik.readthedocs.io/en/latest/arch.html#term-marshalling', 'bgcolor': '#4e3165', 'color': 'white', 'tooltip': 'marshalled'}, '?': {'URL': 'https://graphtik.readthedocs.io/en/latest/arch.html#term-partial-outputs', 'bgcolor': '#fc89ac', 'color': 'white', 'tooltip': 'rescheduled'}, '|': {'URL': 'https://graphtik.readthedocs.io/en/latest/arch.html#term-parallel-execution', 'bgcolor': '#b1ce9a', 'color': 'white', 'tooltip': 'parallel'}, '}': {'URL': 'https://graphtik.readthedocs.io/en/latest/arch.html#term-returns-dictionary', 'bgcolor': '#cc5500', 'color': 'white', 'tooltip': 'returns_dict'}}}

Operation styles may specify one or more “letters” in a badges list item, as long as the “letter” is contained in the dictionary below.

op_template = <Template memory:7fbda60094e0>

Try to mimic a regular Graphviz node attributes (see examples in test.test_plot.test_op_template_full() for params). TODO: fix jinja2 template is un-picklable!

overwrite_color = 'SkyBlue'
pruned_color = '#d3d3d3'
resched_thickness = 4
show_chaindocs = False[source]

When true, plot also hierarchical data nodes that are not directly linked to operations.

show_steps = False[source]

When true, plot also execution steps, linking operations and evictions with green dotted lines labeled with numbers denoting the execution order.

steps_color = '#00bbbb'
subdoc_color = '#8B4513'
static theme_attributes(obj)dict[source]

Extract public data attributes of a Theme instance.

truncate_args = ((23, True), {'reverse': True})

args for jinja2 patched truncate filter, above.

withset(**kw)graphtik.plot.Theme[source]

Returns a deep-clone modified by kw.

graphtik.plot.USER_STYLE_PREFFIX = 'graphviz.'

Any nx-attributes starting with this prefix are appended verbatim as graphviz attributes, by stack_user_style().

graphtik.plot.active_plotter_plugged(plotter: graphtik.plot.Plotter)None[source]

Like set_active_plotter() as a context-manager, resetting back to old value.

graphtik.plot.as_identifier(s)[source]

Convert string into a valid ID, both for html & graphviz.

It must not rely on Graphviz’s HTML-like string, because it would not be a valid HTML-ID.

graphtik.plot.default_jupyter_render = {'svg_container_styles': '', 'svg_element_styles': 'width: 100%; height: 300px;', 'svg_pan_zoom_json': '{controlIconsEnabled: true, fit: true}'}

A nested dictionary controlling the rendering of graph-plots in Jupyter cells,

as those returned from Plottable.plot() (currently as SVGs). Either modify it in place, or pass another one in the respective methods.

The following keys are supported.

Parameters
  • svg_pan_zoom_json

    arguments controlling the rendering of a zoomable SVG in Jupyter notebooks, as defined in https://github.com/ariutta/svg-pan-zoom#how-to-use if None, defaults to string (also maps supported):

    "{controlIconsEnabled: true, zoomScaleSensitivity: 0.4, fit: true}"
    

  • svg_element_styles

    mostly for sizing the zoomable SVG in Jupyter notebooks. Inspect & experiment on the html page of the notebook with browser tools. if None, defaults to string (also maps supported):

    "width: 100%; height: 300px;"
    

  • svg_container_styles – like svg_element_styles, if None, defaults to empty string (also maps supported).

graphtik.plot.get_active_plotter()graphtik.plot.Plotter[source]

Get the previously active plotter instance or default one.

graphtik.plot.get_node_name(nx_node, raw=False)[source]
graphtik.plot.graphviz_html_string(s, *, repl_nl=None, repl_colon=None, xmltext=None)[source]

Workaround pydot parsing of node-id & labels by encoding as HTML.

  • pydot library does not quote DOT-keywords anywhere (pydot#111).

  • Char : on node-names denote port/compass-points and break IDs (pydot#224).

  • Non-strings are not quote_if_necessary by pydot.

  • NLs im tooltips of HTML-Table labels need substitution with the XML-entity.

  • HTML-Label attributes (xmlattr=True) need both html-escape & quote.

Attention

It does not correctly handle ID:port:compass-point format.

See https://www.graphviz.org/doc/info/lang.html)

graphtik.plot.is_nx_node_dependent(graph, nx_node)[source]

Return true if node’s edges are not subdoc only.

graphtik.plot.legend(filename=None, show=None, jupyter_render: Mapping = None, plotter: graphtik.plot.Plotter = None)[source]

Generate a legend for all plots (see Plottable.plot() for args)

Parameters
  • plotter – override the active plotter

  • show

    Deprecated since version v6.1.1: Merged with filename param (filename takes precedence).

See Plotter.render_pydot() for the rest arguments.

graphtik.plot.make_data_value_tooltip(plot_args: graphtik.base.PlotArgs)[source]

Called on datanodes, when solution exists.

graphtik.plot.make_fn_tooltip(plot_args: graphtik.base.PlotArgs)[source]

the sources of the operation-function

graphtik.plot.make_op_tooltip(plot_args: graphtik.base.PlotArgs)[source]

the string-representation of an operation (name, needs, provides)

graphtik.plot.make_template(s)[source]

Makes dedented jinja2 templates supporting extra escape filters for Graphviz:

ee

Like default escape filter e, but Nones/empties evaluate to false. Needed because the default escape filter breaks xmlattr filter with Nones .

eee

Escape for when writting inside HTML-strings. Collapses nones/empties (unlike default e).

hrefer

Dubious escape for when writting URLs inside Graphviz attributes. Does NOT collapse nones/empties (like default e)

graphtik.plot.quote_html_tooltips(s)[source]

Graphviz HTML-Labels ignore NLs & TABs.

graphtik.plot.quote_node_id(s)[source]

See graphviz_html_string()

graphtik.plot.remerge(*containers, source_map: list = None)[source]

Merge recursively dicts and extend lists with boltons.iterutils.remap()

screaming on type conflicts, ie, a list needs a list, etc, unless one of them is None, which is ignored.

Parameters
  • containers – a list of dicts or lists to merge; later ones take precedence (last-wins). If source_map is given, these must be 2-tuples of (name: container).

  • source_map

    If given, it must be a dictionary, and containers arg must be 2-tuples like (name: container). The source_map will be populated with mappings between path and the name of the container it came from.

    Warning

    if source_map given, the order of input dictionaries is NOT preserved is the results (important if your code rely on PY3.7 stable dictionaries).

Returns

returns a new, merged top-level container.

Example

>>> defaults = {
...     'subdict': {
...         'as_is': 'hi',
...         'overridden_key1': 'value_from_defaults',
...         'overridden_key1': 2222,
...         'merged_list': ['hi', {'untouched_subdict': 'v1'}],
...     }
... }
>>> overrides = {
...     'subdict': {
...         'overridden_key1': 'overridden value',
...         'overridden_key2': 5555,
...         'merged_list': ['there'],
...     }
... }
>>> from graphtik.plot import remerge
>>> source_map = {}
>>> remerge(
...     ("defaults", defaults),
...     ("overrides", overrides),
...     source_map=source_map)
 {'subdict': {'as_is': 'hi',
              'overridden_key1': 'overridden value',
              'merged_list': ['hi', {'untouched_subdict': 'v1'}, 'there'],
              'overridden_key2': 5555}}
>>> source_map
{('subdict', 'as_is'): 'defaults',
 ('subdict', 'overridden_key1'): 'overrides',
 ('subdict', 'merged_list'):  ['defaults', 'overrides'],
 ('subdict',): 'overrides',
 ('subdict', 'overridden_key2'): 'overrides'}
graphtik.plot.set_active_plotter(plotter: graphtik.plot.Plotter)[source]

The default instance to render plottables,

unless overridden with a plotter argument in Plottable.plot().

Parameters

plotter – the plotter instance to install

graphtik.plot.supported_plot_formats() → List[str][source]

return automatically all pydot extensions

Module: config

configurations for network execution, and utilities on them.

See also

methods plot.active_plotter_plugged(), plot.set_active_plotter(), plot.get_active_plotter()

Plot configrations were not defined here, not to pollute import space early, until they are actually needed.

Note

The contant-manager function XXX_plugged() or XXX_enabled() do NOT launch their code blocks using contextvars.Context.run() in a separate “context”, so any changes to these or other context-vars will persist (unless they are also done within such context-managers)

graphtik.config.abort_run()[source]

Sets the abort run global flag, to halt all currently or future executing plans.

This global flag is reset when any Pipeline.compute() is executed, or manually, by calling reset_abort().

graphtik.config.debug_enabled(*args, **kwds)[source]

Like set_debug() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.evictions_skipped(*args, **kwds)[source]

Like set_skip_evictions() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.execution_pool_plugged(pool: Optional[Pool])[source]

Like set_execution_pool() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.get_execution_pool() → Optional[Pool][source]

Get the process-pool for parallel plan executions.

graphtik.config.is_abort()[source]

Return True if networks have been signaled to stop execution.

graphtik.config.is_debug() → Optional[bool][source]

see set_debug()

graphtik.config.is_endure_operations() → Optional[bool][source]

see set_endure_operations()

graphtik.config.is_layered_solution() → Optional[bool][source]

see set_layered_solution()

graphtik.config.is_marshal_tasks() → Optional[bool][source]

see set_marshal_tasks()

graphtik.config.is_parallel_tasks() → Optional[bool][source]

see set_parallel_tasks()

graphtik.config.is_reschedule_operations() → Optional[bool][source]

see set_reschedule_operations()

graphtik.config.is_skip_evictions() → Optional[bool][source]

see set_skip_evictions()

graphtik.config.operations_endured(*args, **kwds)[source]

Like set_endure_operations() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.operations_reschedullled(*args, **kwds)[source]

Like set_reschedule_operations() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.reset_abort()[source]

Reset the abort run global flag, to permit plan executions to proceed.

graphtik.config.set_debug(*args, **kwds)[source]

When true, increase details on string-representation of network objects and errors.

Parameters

enabled

  • None, False, string(0, false, off, no): Disabled

  • TODO: 1: Enable ALL DEBUG_XXX

  • TODO: integers: Enable respective DEBUG_XXX bit-field constants

  • anything else: Enable ALL DEBUG_XXX

Affected behavior:

  • FnOp.compute() prints out full given-inputs (not just their keys);

  • net objects print details recursively;

  • plotted SVG diagrams include style-provenance as tooltips;

  • Sphinx extension also saves the original DOT file next to each image (see graphtik_save_dot_files).

Note

The default is controlled with GRAPHTIK_DEBUG environment variable.

Note that enabling this flag is different from enabling logging in DEBUG, since it affects all code (eg interactive printing in debugger session, exceptions, doctests), not just debug statements (also affected by this flag).

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_endure_operations(*args, **kwds)[source]

Enable/disable globally endurance to keep executing even if some operations fail.

Parameters

enable

  • If None (default), respect the flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

.

graphtik.config.set_execution_pool(pool: Optional[Pool])[source]

Set the process-pool for parallel plan executions.

You may have to :also func:set_marshal_tasks() to resolve pickling issues.

graphtik.config.set_layered_solution(*args, **kwds)[source]

whether to store operation results into separate solution layer

Parameters

enable

  • if None (default), results are layered only if there are NO jsonp dependencies in the network.

  • When True(False), always keep(don’t keep) results in a separate layer for each operation, regardless of any jsonp dependencies.

It overrides any param given when executing a pipeline or a plan.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_marshal_tasks(*args, **kwds)[source]

Enable/disable globally marshalling of parallel operations, …

inputs & outputs with dill, which might help for pickling problems.

Parameters

enable

  • If None (default), respect the respective flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_parallel_tasks(*args, **kwds)[source]

Enable/disable globally parallel execution of operations.

Parameters

enable

  • If None (default), respect the respective flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.set_reschedule_operations(*args, **kwds)[source]

Enable/disable globally rescheduling for operations returning only partial outputs.

Parameters

enable

  • If None (default), respect the flag on each operation;

  • If true/false, force it for all operations.

Returns

a “reset” token (see ContextVar.set())

.

graphtik.config.set_skip_evictions(*args, **kwds)[source]

When true, disable globally evictions, to keep all intermediate solution values, …

regardless of asked outputs.

Returns

a “reset” token (see ContextVar.set())

graphtik.config.solution_layered(*args, **kwds)[source]

Like set_layered_solution() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.tasks_in_parallel(*args, **kwds)[source]

Like set_parallel_tasks() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

graphtik.config.tasks_marshalled(*args, **kwds)[source]

Like set_marshal_tasks() as a context-manager, resetting back to old value.

See also

disclaimer about context-managers the top of this config module.

Module: base

Generic utilities, exceptions and operation & plottable base classes.

exception graphtik.base.AbortedException[source]

Raised from Network when abort_run() is called, and contains the solution …

with any values populated so far.

exception graphtik.base.IncompleteExecutionError[source]

Reported when any endured/reschedule operations were are canceled.

The exception contains 3 arguments:

  1. the causal errors and conditions (1st arg),

  2. the list of collected exceptions (2nd arg), and

  3. the solution instance (3rd argument), to interrogate for more.

Returned by check_if_incomplete() or raised by scream_if_incomplete().

class graphtik.base.Operation[source]

An abstract class representing an action with compute().

abstract compute(named_inputs, outputs=None)[source]

Compute (optional) asked outputs for the given named_inputs.

It is called by Network. End-users should simply call the operation with named_inputs as kwargs.

Parameters

named_inputs – the input values with which to feed the computation.

Returns list

Should return a list values representing the results of running the feed-forward computation on inputs.

name: str[source]
needs: Items[source]
op_needs: Items[source]
op_provides: Items[source]
prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Delegate to a provisional network with a single op .

provides: Items[source]
class graphtik.base.PlotArgs(plottable: Plottable = None, graph: nx.Graph = None, name: str = None, steps: Collection = None, inputs: Collection = None, outputs: Collection = None, solution: graphtik.planning.Solution = None, clusters: Mapping = None, plotter: graphtik.plot.Plotter = None, theme: graphtik.plot.Theme = None, dot: pydot.Dot = None, nx_item: Any = None, nx_attrs: dict = None, dot_item: Any = None, clustered: dict = None, jupyter_render: Mapping = None, filename: Union[str, bool, int] = None)[source]

All the args of a Plottable.plot() call,

check this method for a more detailed explanation of its attributes.

clone_or_merge_graph(base_graph)graphtik.base.PlotArgs[source]

Overlay graph over base_graph, or clone base_graph, if no attribute.

Returns

the updated plot_args

property clustered

Collect the actual clustered dot_nodes among the given nodes.

property clusters

Either a mapping of node-names to dot(.)-separated cluster-names, or false/true to enable plotter’s default clustering of nodes based on their dot-separated name parts.

Note that if it’s None (default), the plotter will cluster based on node-names, BUT the Plan may replace the None with a dictionary with the “pruned” cluster (when its dag differs from network’s graph); to suppress the pruned-cluster, pass a truthy, NON-dictionary value.

property dot

Where to add graphviz nodes & stuff.

property dot_item

The pydot-node/edge created

property filename

where to write image or show in a matplotlib window

property graph

what to plot (or the “overlay” when calling Plottable.plot())

property inputs

the list of input names .

property jupyter_render

jupyter configuration overrides

property kw_render_pydot
property name

The name of the graph in the dot-file (important for cmaps).

property nx_attrs

Attributes gotten from nx-graph for the given graph/node/edge. They are NOT a clone, so any modifications affect the nx graph.

property nx_item

The node (data(str) or Operation) or edge as gotten from nx-graph.

property outputs

the list of output names .

property plottable

who is the caller

property plotter

If given, overrides :active plotter`.

property solution

Contains the computed results, which might be different from plottable.

property steps

the list of execution plan steps.

property theme

If given, overrides plot theme plotter will use. It can be any mapping, in which case it overrite the current theme.

with_defaults(*args, **kw)graphtik.base.PlotArgs[source]

Replace only fields with None values.

class graphtik.base.Plottable[source]

plottable capabilities and graph props for all major classes of the project.

Classes wishing to plot their graphs should inherit this and implement property plot to return a “partial” callable that somehow ends up calling plot.render_pydot() with the graph or any other args bound appropriately. The purpose is to avoid copying this function & documentation here around.

property data

A new list with all operations contained in the network.

find_op_by_name(name) → Optional[graphtik.base.Operation][source]

Fetch the 1st operation named with the given name.

find_ops(predicate) → List[graphtik.base.Operation][source]

Scan operation nodes and fetch those satisfying predicate.

Parameters

predicate – the node predicate is a 2-argument callable(op, node-data) that should return true for nodes to include.

graph: ‘networkx.Graph’[source]
property ops

A new list with all operations contained in the network.

plot(filename: Union[str, bool, int] = None, show=None, *, plotter: graphtik.plot.Plotter = None, theme: graphtik.plot.Theme = None, graph: networkx.Graph = None, name=None, steps=None, inputs=None, outputs=None, solution: graphtik.planning.Solution = None, clusters: Mapping = None, jupyter_render: Union[None, Mapping, str] = None) → pydot.Dot[source]

Entry-point for plotting ready made operation graphs.

Parameters
  • filename (str) –

    Write a file or open a matplotlib window.

    • If it is a string or file, the diagram is written into the file-path

      Common extensions are .png .dot .jpg .jpeg .pdf .svg call plot.supported_plot_formats() for more.

    • If it IS True, opens the diagram in a matplotlib window (requires matplotlib package to be installed).

    • If it equals -1, it mat-plots but does not open the window.

    • Otherwise, just return the pydot.Dot instance.

    seealso

    PlotArgs.filename, Plotter.render_pydot()

  • plottable

    the plottable that ordered the plotting. Automatically set downstreams to one of:

    op | pipeline | net | plan | solution | <missing>
    
    seealso

    PlotArgs.plottable

  • plotter

    the plotter to handle plotting; if none, the active plotter is used by default.

    seealso

    PlotArgs.plotter

  • theme

    Any plot theme or dictionary overrides; if none, the Plotter.default_theme of the active plotter is used.

    seealso

    PlotArgs.theme

  • name

    if not given, dot-lang graph would is named “G”; necessary to be unique when referring to generated CMAPs. No need to quote it, handled by the plotter, downstream.

    seealso

    PlotArgs.name

  • graph (str) –

    (optional) A nx.Digraph with overrides to merge with the graph provided by underlying plottables (translated by the active plotter).

    It may contain graph, node & edge attributes for any usage, but these conventions apply:

    'graphviz.xxx' (graph/node/edge attributes)

    Any “user-overrides” with this prefix are sent verbatim a Graphviz attributes.

    Note

    Remember to escape those values as Graphviz HTML-Like strings (use plot.graphviz_html_string()).

    no_plot (node/edge attribute)

    element skipped from plotting (see “Examples:” section, below)

    seealso

    PlotArgs.graph

  • inputs

    an optional name list, any nodes in there are plotted as a “house”

    seealso

    PlotArgs.inputs

  • outputs

    an optional name list, any nodes in there are plotted as an “inverted-house”

    seealso

    PlotArgs.outputs

  • solution

    an optional dict with values to annotate nodes, drawn “filled” (currently content not shown, but node drawn as “filled”). It extracts more infos from a Solution instance, such as, if solution has an executed attribute, operations contained in it are drawn as “filled”.

    seealso

    PlotArgs.solution

  • clusters

    Either a mapping, or false/true to enable plotter’s default clustering of nodes base on their dot-separated name parts.

    Note that if it’s None (default), the plotter will cluster based on node-names, BUT the Plan may replace the None with a dictionary with the “pruned” cluster (when its dag differs from network’s graph); to suppress the pruned-cluster, pass a truthy, NON-dictionary value.

    Practically, when it is a:

    • dictionary of node-names –> dot(.)-separated cluster-names, it is respected, even if empty;

    • truthy: cluster based on dot(.)-separated node-name parts;

    • falsy: don’t cluster at all.

    seealso

    PlotArgs.clusters

  • jupyter_render

    a nested dictionary controlling the rendering of graph-plots in Jupyter cells, if None, defaults to jupyter_render; you may modify it in place and apply for all future calls (see Jupyter notebooks).

    seealso

    PlotArgs.jupyter_render

  • show

    Deprecated since version v6.1.1: Merged with filename param (filename takes precedence).

Returns

a pydot.Dot instance (for reference to as similar API to pydot.Dot instance, visit: https://pydotplus.readthedocs.io/reference.html#pydotplus.graphviz.Dot)

The pydot.Dot instance returned is rendered directly in Jupyter/IPython notebooks as SVG images (see Jupyter notebooks).

Note that the graph argument is absent - Each Plottable provides its own graph internally; use directly render_pydot() to provide a different graph.

Graphtik Legend

NODES:

oval

function

egg

subgraph operation

house

given input

inversed-house

asked output

polygon

given both as input & asked as output (what?)

square

intermediate data, neither given nor asked.

red frame

evict-instruction, to free up memory.

filled

data node has a value in solution OR function has been executed.

thick frame

function/data node in execution steps.

ARROWS

solid black arrows

dependencies (source-data need-ed by target-operations, sources-operations provides target-data)

dashed black arrows

optional needs

blue arrows

sideffect needs/provides

wheat arrows

broken dependency (provide) during pruning

green-dotted arrows

execution steps labeled in succession

To generate the legend, see legend().

Examples:

>>> from graphtik import compose, operation
>>> from graphtik.modifier import optional
>>> from operator import add
>>> pipeline = compose("pipeline",
...     operation(name="add", needs=["a", "b1"], provides=["ab1"])(add),
...     operation(name="sub", needs=["a", optional("b2")], provides=["ab2"])(lambda a, b=1: a-b),
...     operation(name="abb", needs=["ab1", "ab2"], provides=["asked"])(add),
... )
>>> pipeline.plot(True);                       # plot just the graph in a matplotlib window # doctest: +SKIP
>>> inputs = {'a': 1, 'b1': 2}
>>> solution = pipeline(**inputs)              # now plots will include the execution-plan

The solution is also plottable:

>>> solution.plot('plot1.svg');                                                             # doctest: +SKIP

or you may augment the pipelinewith the requested inputs/outputs & solution:

>>> pipeline.plot('plot1.svg', inputs=inputs, outputs=['asked', 'b1'], solution=solution);  # doctest: +SKIP

In any case you may get the pydot.Dot object (n.b. it is renderable in Jupyter as-is):

>>> dot = pipeline.plot(solution=solution);
>>> print(dot)
digraph pipeline {
fontname=italic;
label=<pipeline>;
<a> [fillcolor=wheat, label=a, margin="0.04,0.02", shape=invhouse, style=filled, tooltip="(int) 1"];
...

You may use the PlotArgs.graph overlay to skip certain nodes (or edges) from the plots:

>>> import networkx as nx
>>> g = nx.DiGraph()  # the overlay
>>> to_hide = pipeline.net.find_op_by_name("sub")
>>> g.add_node(to_hide, no_plot=True)
>>> dot = pipeline.plot(graph=g)
>>> assert "<sub>" not in str(dot), str(dot)
abstract prepare_plot_args(plot_args: graphtik.base.PlotArgs)graphtik.base.PlotArgs[source]

Called by plot() to create the nx-graph and other plot-args, e.g. solution.

class graphtik.base.RenArgs(typ: str, op: Operation, name: str, parent: Pipeline = None)[source]

Arguments received by callbacks in rename() and operation nesting.

property name

Alias for field number 2

property op

the operation currently being processed

property parent

The parent Pipeline of the operation currently being processed,. Has value only when doing operation nesting from compose().

property typ

what is currently being renamed, one of the string:

op
need.jsonpart
need
provide.jsonpart
provide
alias.jsonpart
alias

Any jsonp parts are renamed prior to the full path (as ordered above).

class graphtik.base.Token(s)[source]

Guarantee equality, not(!) identity, across processes.

hashid
graphtik.base.aslist(i, argname, allowed_types=<class 'list'>)[source]

Utility to accept singular strings as lists, and None –> [].

graphtik.base.astuple(i, argname, allowed_types=<class 'tuple'>)[source]
graphtik.base.first_solid(*tristates, default=None)[source]

Utility combining multiple tri-state booleans.

graphtik.base.func_name(fn, default=Ellipsis, mod=None, fqdn=None, human=None, partials=None) → Optional[str][source]

FQDN of fn, descending into partials to print their args.

Parameters
  • default – What to return if it fails; by default it raises.

  • mod – when true, prepend module like module.name.fn_name

  • fqdn – when true, use __qualname__ (instead of __name__) which differs mostly on methods, where it contains class(es), and locals, respectively (PEP 3155). Sphinx uses fqdn=True for generating IDs.

  • human – when true, explain built-ins, and assume partials=True (if that was None)

  • partials – when true (or omitted & human true), partials denote their args like fn({"a": 1}, ...)

Returns

a (possibly dot-separated) string, or default (unless this is ...`).

Raises

Only if default is ..., otherwise, errors debug-logged.

Examples

>>> func_name(func_name)
'func_name'
>>> func_name(func_name, mod=1)
'graphtik.base.func_name'
>>> func_name(func_name.__format__, fqdn=0)
'__format__'
>>> func_name(func_name.__format__, fqdn=1)
'function.__format__'

Even functions defined in docstrings are reported:

>>> def f():
...     def inner():
...         pass
...     return inner
>>> func_name(f, mod=1, fqdn=1)
'graphtik.base.f'
>>> func_name(f(), fqdn=1)
'f.<locals>.inner'

On failures, arg default controls the outcomes:

TBD

graphtik.base.func_source(fn, *args, **kw) → Optional[Tuple[str, int]][source]

Like inspect.getsource() supporting partials.

Parameters
  • default – If given, better be a 2-tuple respecting types, or ..., to raise.

  • human – when true, denote builtins like python does

graphtik.base.func_sourcelines(fn, *args, **kw) → Optional[Tuple[str, int]][source]

Like inspect.getsourcelines() supporting partials.

Parameters

default – If given, better be a 2-tuple respecting types, or ..., to raise.

graphtik.base.jetsam(ex, locs, *salvage_vars: str, annotation='jetsam', **salvage_mappings)[source]

Annotate exception with salvaged values from locals() and raise!

Parameters
  • ex – the exception to annotate

  • locs

    locals() from the context-manager’s block containing vars to be salvaged in case of exception

    ATTENTION: wrapped function must finally call locals(), because locals dictionary only reflects local-var changes after call.

  • annotation – the name of the attribute to attach on the exception

  • salvage_vars – local variable names to save as is in the salvaged annotations dictionary.

  • salvage_mappings – a mapping of destination-annotation-keys –> source-locals-keys; if a source is callable, the value to salvage is retrieved by calling value(locs). They take precedence over`salvage_vars`.

Raises

any exception raised by the wrapped function, annotated with values assigned as attributes on this context-manager

  • Any attributes attached on this manager are attached as a new dict on the raised exception as new jetsam attribute with a dict as value.

  • If the exception is already annotated, any new items are inserted, but existing ones are preserved.

Example:

Call it with managed-block’s locals() and tell which of them to salvage in case of errors:

try:
    a = 1
    b = 2
    raise Exception()
exception Exception as ex:
    jetsam(ex, locals(), "a", b="salvaged_b", c_var="c")
    raise

And then from a REPL:

import sys
sys.last_value.jetsam
{'a': 1, 'salvaged_b': 2, "c_var": None}

** Reason:**

Graphs may become arbitrary deep. Debugging such graphs is notoriously hard.

The purpose is not to require a debugger-session to inspect the root-causes (without precluding one).

Naively salvaging values with a simple try/except block around each function, blocks the debugger from landing on the real cause of the error - it would land on that block; and that could be many nested levels above it.

Module: sphinxext

Extends Sphinx with graphtik directive for plotting from doctest code.

class graphtik.sphinxext.DocFilesPurgatory[source]

Keeps 2-way associations of docs <–> abs-files, to purge them.

doc_fpaths: Dict[str, Set[Path]][source]

Multi-map: docnames -> abs-file-paths

purge_doc(docname: str)[source]

Remove doc-files not used by any other doc.

register_doc_fpath(docname: str, fpath: pathlib.Path)[source]

Must be absolute, for purging to work.

class graphtik.sphinxext.GraphtikDoctestDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]

Embeds plots from doctest code (see graphtik).

class graphtik.sphinxext.GraphtikTestoutputDirective(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]

Like graphtik directive, but emulates doctest testoutput blocks.

class graphtik.sphinxext.dynaimage(rawsource='', *children, **attributes)[source]

Writes a tag in tag attr (<img> for PNGs, <object> for SVGs/PDFs).

class graphtik.sphinxext.graphtik_node(rawsource='', *children, **attributes)[source]

Non-writtable node wrapping (literal-block + figure(dynaimage)) nodes.

The figure gets the :name: & :align: options, the contained dynaimage gets the :height”, :width:, :scale:, :classes: options.