1. Operations

An operation is a function in a computation pipeline, abstractly represented by the Operation class. This class specifies the dependencies forming the pipeline’s network.

Defining Operations

You may inherit the Operation abstract class and override its Operation.compute() method to manually do the following:

  • read declared as needs values from solution,

  • match those values into function arguments,

  • call your function to do it’s business,

  • “zip” the function’s results with the operation’s declared provides, and finally

  • hand back those zipped values to solution for further actions.

But there is an easier way – actually half of the code in this project is dedicated to retrofitting existing functions unaware of all these, into operations.

Operations from existing functions

The FnOp provides a concrete lightweight wrapper around any arbitrary function to define and execute within a pipeline. Use the operation() factory to instantiate one:

>>> from operator import add
>>> from graphtik import operation
>>> add_op = operation(add,
...                    needs=['a', 'b'],
...                    provides=['a_plus_b'])
>>> add_op
FnOp(name='add', needs=['a', 'b'], provides=['a_plus_b'], fn='add')

You may still call the original function at FnOp.fn, bypassing thus any operation pre-processing:

>>> add_op.fn(3, 4)
7

But the proper way is to call the operation (either directly or by calling the FnOp.compute() method). Notice though that unnamed positional parameters are not supported:

>>> add_op(a=3, b=4)
{'a_plus_b': 7}

Tip

In case your function needs to access the execution machinery or its wrapping operation, it can do that through the task_context (unstable API, not working during parallel execution, see Accessing wrapper operation from task-context)

Builder pattern

There are two ways to instantiate a FnOps, each one suitable for different scenarios.

We’ve seen that calling manually operation() allows putting into a pipeline functions that are defined elsewhere (e.g. in another module, or are system functions).

But that method is also useful if you want to create multiple operation instances with similar attributes, e.g. needs:

>>> op_factory = operation(needs=['a'])

Notice that we specified a fn, in order to get back a FnOp instance (and not a decorator).

>>> from graphtik import operation, compose
>>> from functools import partial
>>> def mypow(a, p=2):
...    return a ** p
>>> pow_op2 = op_factory.withset(fn=mypow, provides="^2")
>>> pow_op3 = op_factory.withset(fn=partial(mypow, p=3), name='pow_3', provides='^3')
>>> pow_op0 = op_factory.withset(fn=lambda a: 1, name='pow_0', provides='^0')
>>> graphop = compose('powers', pow_op2, pow_op3, pow_op0)
>>> graphop
Pipeline('powers', needs=['a'], provides=['^2', '^3', '^0'], x3 ops:
   mypow, pow_3, pow_0)
>>> graphop(a=2)
{'a': 2, '^2': 4, '^3': 8, '^0': 1}

Tip

See Plotting on how to make diagrams like this.

Decorator specification

If you are defining your computation graph and the functions that comprise it all in the same script, the decorator specification of operation instances might be particularly useful, as it allows you to assign computation graph structure to functions as they are defined. Here’s an example:

>>> from graphtik import operation, compose
>>> @operation(needs=['b', 'a', 'r'], provides='bar')
... def foo(a, b, c):
...   return c * (a + b)
>>> graphop = compose('foo_graph', foo)
  • Notice that if name is not given, it is deduced from the function name.

Specifying graph structure: provides and needs

Each operation is a node in a computation graph, depending and supplying data from and to other nodes (via the solution), in order to compute.

This graph structure is specified (mostly) via the provides and needs arguments to the operation() factory, specifically:

needs

this argument names the list of (positionally ordered) inputs data the operation requires to receive from solution. The list corresponds, roughly, to the arguments of the underlying function (plus any sideffects).

It can be a single string, in which case a 1-element iterable is assumed.

seealso

needs, modifier, FnOp.needs, FnOp.op_needs, FnOp._fn_needs

provides

this argument names the list of (positionally ordered) outputs data the operation provides into the solution. The list corresponds, roughly, to the returned values of the fn (plus any sideffects & aliases).

It can be a single string, in which case a 1-element iterable is assumed.

If they are more than one, the underlying function must return an iterable with same number of elements (unless it returns dictionary).

seealso

provides, modifier, FnOp.provides, FnOp.op_provides, FnOp._fn_provides

Declarations of needs and provides is affected by modifiers like keyword():

Map inputs to different function arguments

graphtik.modifier.keyword(name: str, keyword: str = None, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate a needs that (optionally) maps inputs name –> keyword argument name.

The value of a keyword dependency is passed in as keyword argument to the underlying function.

Parameters
  • keyword

    The argument-name corresponding to this named-input. If it is None, assumed the same as name, so as to behave always like kw-type arg, and to preserve its fn-name if ever renamed.

    Note

    This extra mapping argument is needed either for optionals (but not varargish), or for functions with keywords-only arguments (like def func(*, foo, bar): ...), since inputs are normally fed into functions by-position, not by-name.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Returns

a _Modifier instance, even if no keyword is given OR it is the same as name.

Example:

In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of keyword():

>>> from graphtik import operation, compose, keyword
>>> @operation(needs=['a', keyword("name-in-inputs", "b")], provides="sum")
... def myadd(a, *, b):
...    return a + b
>>> myadd
FnOp(name='myadd',
                    needs=['a', 'name-in-inputs'(>'b')],
                    provides=['sum'],
                    fn='myadd')
>>> graph = compose('mygraph', myadd)
>>> graph
Pipeline('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops: myadd)
>>> sol = graph.compute({"a": 5, "name-in-inputs": 4})['sum']
>>> sol
9

Operations may execute with missing inputs

graphtik.modifier.optional(name: str, keyword: str = None, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate optionals needs corresponding to defaulted op-function arguments, …

received only if present in the inputs (when operation is invoked).

The value of an optional dependency is passed in as a keyword argument to the underlying function.

Parameters
  • keyword – the name for the function argument it corresponds; if a falsy is given, same as name assumed, to behave always like kw-type arg and to preserve its fn-name if ever renamed.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Example:

>>> from graphtik import operation, compose, optional
>>> @operation(name='myadd',
...            needs=["a", optional("b")],
...            provides="sum")
... def myadd(a, b=0):
...    return a + b

Notice the default value 0 to the b annotated as optional argument:

>>> graph = compose('mygraph', myadd)
>>> graph
Pipeline('mygraph',
                 needs=['a', 'b'(?)],
                 provides=['sum'],
                 x1 ops: myadd)

The graph works both with and without c provided in the inputs:

>>> graph(a=5, b=4)['sum']
9
>>> graph(a=5)
{'a': 5, 'sum': 5}

Like keyword() you may map input-name to a different function-argument:

>>> operation(needs=['a', optional("quasi-real", "b")],
...           provides="sum"
... )(myadd.fn)  # Cannot wrap an operation, its `fn` only.
FnOp(name='myadd',
                    needs=['a', 'quasi-real'(?'b')],
                    provides=['sum'],
                    fn='myadd')

Calling functions with varargs (*args)

graphtik.modifier.vararg(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotate a varargish needs to be fed as function’s *args.

Parameters
  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

We designate b & c as vararg arguments:

>>> from graphtik import operation, compose, vararg
>>> @operation(
...     needs=['a', vararg('b'), vararg('c')],
...     provides='sum'
... )
... def addall(a, *b):
...    return a + sum(b)
>>> addall
FnOp(name='addall',
                    needs=['a', 'b'(*), 'c'(*)],
                    provides=['sum'],
                    fn='addall')
>>> graph = compose('mygraph', addall)

The graph works with and without any of b or c inputs:

>>> graph(a=5, b=2, c=4)['sum']
11
>>> graph(a=5, b=2)
{'a': 5, 'b': 2, 'sum': 7}
>>> graph(a=5)
{'a': 5, 'sum': 5}
graphtik.modifier.varargs(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

An varargish vararg(), naming a iterable value in the inputs.

Parameters
  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

>>> from graphtik import operation, compose, varargs
>>> def enlist(a, *b):
...    return [a] + list(b)
>>> graph = compose('mygraph',
...     operation(name='enlist', needs=['a', varargs('b')],
...     provides='sum')(enlist)
... )
>>> graph
Pipeline('mygraph',
                 needs=['a', 'b'(?)],
                 provides=['sum'],
                 x1 ops: enlist)

The graph works with or without b in the inputs:

>>> graph(a=5, b=[2, 20])['sum']
[5, 2, 20]
>>> graph(a=5)
{'a': 5, 'sum': [5]}
>>> graph(a=5, b=0xBAD)
Traceback (most recent call last):
...
ValueError: Failed preparing needs:
    1. Expected needs['b'(+)] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FnOp(name='enlist', needs=['a', 'b'(+)], provides=['sum'], fn='enlist')

Attention

To avoid user mistakes, varargs do not accept str inputs (though iterables):

>>> graph(a=5, b="mistake")
Traceback (most recent call last):
...
ValueError: Failed preparing needs:
    1. Expected needs['b'(+)] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FnOp(name='enlist',
                           needs=['a', 'b'(+)],
                           provides=['sum'],
                           fn='enlist')

See also

The elaborate example in Hierarchical data and further tricks section.

Aliased provides

Sometimes, you need to interface functions & operations where they name a dependency differently. This is doable without introducing “pipe-through” interface operation, either by annotating certain needs with keyword() modifier (above), or by aliassing certain provides to different names:

>>> op = operation(str,
...                name="`provides` with `aliases`",
...                needs="anything",
...                provides="real thing",
...                aliases=("real thing", "phony"))

Default conveyor operation

If you don’t specify a callable, the default identity function get assigned, as long a name for the operation is given, and the number of needs matches the number of provides.

This facilitates conveying inputs into renamed outputs without the need to define a trivial identity function matching the needs & provides each time:

>>> from graphtik import keyword, optional, vararg
>>> op = operation(
...     None,
...     name="a",
...     needs=[optional("opt"), vararg("vararg"), "pos", keyword("kw")],
...     # positional vararg, keyword, optional
...     provides=["pos", "vararg", "kw", "opt"],
... )
>>> op(opt=5, vararg=6, pos=7, kw=8)
{'pos': 7, 'vararg': 6, 'kw': 5, 'opt': 8}

Notice that the order of the results is not that of the needs (or that of the inputs in the compute() method), but, as explained in the comment-line, it follows Python semantics.

Considerations for when building pipelines

When many operations are composed into a computation graph, Graphtik matches up the values in their needs and provides to form the edges of that graph (see Pipelines for more on that), like the operations from the script in Quick start:

>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> def abspow(a, p):
...   """Compute |a|^p. """
...   c = abs(a) ** p
...   return c
>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> graphop = compose("graphop",
...    operation(mul, needs=["a", "b"], provides=["ab"]),
...    operation(sub, needs=["a", "ab"], provides=["a_minus_ab"]),
...    operation(name="abspow1", needs=["a_minus_ab"], provides=["abs_a_minus_ab_cubed"])
...    (partial(abspow, p=3))
... )
>>> graphop
Pipeline('graphop',
                 needs=['a', 'b', 'ab', 'a_minus_ab'],
                 provides=['ab', 'a_minus_ab', 'abs_a_minus_ab_cubed'],
                 x3 ops: mul, sub, abspow1)
  • Notice the use of functools.partial() to set parameter p to a constant value.

  • And this is done by calling once more the returned “decorator* from operation(), when called without a functions.

The needs and provides arguments to the operations in this script define a computation graph that looks like this: