1. Operations¶
An operation is a function in a computation pipeline,
abstractly represented by the Operation
class.
This class specifies the dependencies forming the pipeline’s
network.
Defining Operations¶
You may inherit the Operation
abstract class and override its
Operation.compute()
method to manually do the following:
match those values into function arguments,
call your function to do it’s business,
“zip” the function’s results with the operation’s declared provides, and finally
hand back those zipped values to solution for further actions.
But there is an easier way – actually half of the code in this project is dedicated to retrofitting existing functions unaware of all these, into operations.
Operations from existing functions¶
The FnOp
provides a concrete lightweight wrapper
around any arbitrary function to define and execute within a pipeline.
Use the operation()
factory to instantiate one:
>>> from operator import add
>>> from graphtik import operation
>>> add_op = operation(add,
... needs=['a', 'b'],
... provides=['a_plus_b'])
>>> add_op
FnOp(name='add', needs=['a', 'b'], provides=['a_plus_b'], fn='add')
You may still call the original function at FnOp.fn
,
bypassing thus any operation pre-processing:
>>> add_op.fn(3, 4)
7
But the proper way is to call the operation (either directly or by calling the
FnOp.compute()
method). Notice though that unnamed
positional parameters are not supported:
>>> add_op(a=3, b=4)
{'a_plus_b': 7}
Tip
In case your function needs to access the execution
machinery
or its wrapping operation, it can do that through the task_context
(unstable API, not working during parallel execution, see Accessing wrapper operation from task-context)
Builder pattern¶
There are two ways to instantiate a FnOp
s, each one suitable
for different scenarios.
We’ve seen that calling manually operation()
allows putting into a pipeline
functions that are defined elsewhere (e.g. in another module, or are system functions).
But that method is also useful if you want to create multiple operation instances
with similar attributes, e.g. needs
:
>>> op_factory = operation(needs=['a'])
Notice that we specified a fn, in order to get back a FnOp
instance (and not a decorator).
>>> from graphtik import operation, compose
>>> from functools import partial
>>> def mypow(a, p=2):
... return a ** p
>>> pow_op2 = op_factory.withset(fn=mypow, provides="^2")
>>> pow_op3 = op_factory.withset(fn=partial(mypow, p=3), name='pow_3', provides='^3')
>>> pow_op0 = op_factory.withset(fn=lambda a: 1, name='pow_0', provides='^0')
>>> graphop = compose('powers', pow_op2, pow_op3, pow_op0)
>>> graphop
Pipeline('powers', needs=['a'], provides=['^2', '^3', '^0'], x3 ops:
mypow, pow_3, pow_0)
>>> graphop(a=2)
{'a': 2, '^2': 4, '^3': 8, '^0': 1}
Tip
See Plotting on how to make diagrams like this.
Decorator specification¶
If you are defining your computation graph and the functions that comprise it all in the same script,
the decorator specification of operation
instances might be particularly useful,
as it allows you to assign computation graph structure to functions as they are defined.
Here’s an example:
>>> from graphtik import operation, compose
>>> @operation(needs=['b', 'a', 'r'], provides='bar')
... def foo(a, b, c):
... return c * (a + b)
>>> graphop = compose('foo_graph', foo)
Notice that if
name
is not given, it is deduced from the function name.
Specifying graph structure: provides
and needs
¶
Each operation is a node in a computation graph, depending and supplying data from and to other nodes (via the solution), in order to compute.
This graph structure is specified (mostly) via the provides
and needs
arguments
to the operation()
factory, specifically:
needs
this argument names the list of (positionally ordered) inputs data the operation requires to receive from solution. The list corresponds, roughly, to the arguments of the underlying function (plus any sideffects).
It can be a single string, in which case a 1-element iterable is assumed.
- seealso
provides
this argument names the list of (positionally ordered) outputs data the operation provides into the solution. The list corresponds, roughly, to the returned values of the fn (plus any sideffects & aliases).
It can be a single string, in which case a 1-element iterable is assumed.
If they are more than one, the underlying function must return an iterable with same number of elements (unless it returns dictionary).
- seealso
provides, modifier,
FnOp.provides
,FnOp.op_provides
,FnOp._fn_provides
Declarations of needs and provides is affected by modifiers like
keyword()
:
Map inputs to different function arguments¶
-
graphtik.modifier.
keyword
(name: str, keyword: str = None, accessor: graphtik.modifier.Accessor = None, jsonp=None) → graphtik.modifier._Modifier[source] Annotate a needs that (optionally) maps inputs name –> keyword argument name.
The value of a keyword dependency is passed in as keyword argument to the underlying function.
- Parameters
keyword –
The argument-name corresponding to this named-input. If it is None, assumed the same as name, so as to behave always like kw-type arg, and to preserve its fn-name if ever renamed.
accessor – the functions to access values to/from solution (see
Accessor
) (actually a 2-tuple with functions is ok)jsonp –
If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not
None
) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.Tip
If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.
- Returns
a
_Modifier
instance, even if no keyword is given OR it is the same as name.
Example:
In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of
keyword()
:>>> from graphtik import operation, compose, keyword
>>> @operation(needs=['a', keyword("name-in-inputs", "b")], provides="sum") ... def myadd(a, *, b): ... return a + b >>> myadd FnOp(name='myadd', needs=['a', 'name-in-inputs'(>'b')], provides=['sum'], fn='myadd')
>>> graph = compose('mygraph', myadd) >>> graph Pipeline('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops: myadd)
>>> sol = graph.compute({"a": 5, "name-in-inputs": 4})['sum'] >>> sol 9
Operations may execute with missing inputs¶
-
graphtik.modifier.
optional
(name: str, keyword: str = None, accessor: graphtik.modifier.Accessor = None, jsonp=None) → graphtik.modifier._Modifier[source] Annotate optionals needs corresponding to defaulted op-function arguments, …
received only if present in the inputs (when operation is invoked).
The value of an optional dependency is passed in as a keyword argument to the underlying function.
- Parameters
keyword – the name for the function argument it corresponds; if a falsy is given, same as name assumed, to behave always like kw-type arg and to preserve its fn-name if ever renamed.
accessor – the functions to access values to/from solution (see
Accessor
) (actually a 2-tuple with functions is ok)jsonp –
If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not
None
) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.Tip
If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.
Example:
>>> from graphtik import operation, compose, optional
>>> @operation(name='myadd', ... needs=["a", optional("b")], ... provides="sum") ... def myadd(a, b=0): ... return a + b
Notice the default value
0
to theb
annotated as optional argument:>>> graph = compose('mygraph', myadd) >>> graph Pipeline('mygraph', needs=['a', 'b'(?)], provides=['sum'], x1 ops: myadd)
The graph works both with and without
c
provided in the inputs:>>> graph(a=5, b=4)['sum'] 9 >>> graph(a=5) {'a': 5, 'sum': 5}
Like
keyword()
you may map input-name to a different function-argument:>>> operation(needs=['a', optional("quasi-real", "b")], ... provides="sum" ... )(myadd.fn) # Cannot wrap an operation, its `fn` only. FnOp(name='myadd', needs=['a', 'quasi-real'(?'b')], provides=['sum'], fn='myadd')
Calling functions with varargs (*args
)¶
-
graphtik.modifier.
vararg
(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None) → graphtik.modifier._Modifier[source] Annotate a varargish needs to be fed as function’s
*args
.- Parameters
accessor – the functions to access values to/from solution (see
Accessor
) (actually a 2-tuple with functions is ok)jsonp –
If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not
None
) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.Tip
If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.
See also
Consult also the example test-case in:
test/test_op.py:test_varargs()
, in the full sources of the project.Example:
We designate
b
&c
as vararg arguments:>>> from graphtik import operation, compose, vararg
>>> @operation( ... needs=['a', vararg('b'), vararg('c')], ... provides='sum' ... ) ... def addall(a, *b): ... return a + sum(b) >>> addall FnOp(name='addall', needs=['a', 'b'(*), 'c'(*)], provides=['sum'], fn='addall')
>>> graph = compose('mygraph', addall)
The graph works with and without any of
b
orc
inputs:>>> graph(a=5, b=2, c=4)['sum'] 11 >>> graph(a=5, b=2) {'a': 5, 'b': 2, 'sum': 7} >>> graph(a=5) {'a': 5, 'sum': 5}
-
graphtik.modifier.
varargs
(name: str, accessor: graphtik.modifier.Accessor = None, jsonp=None) → graphtik.modifier._Modifier[source] An varargish
vararg()
, naming a iterable value in the inputs.- Parameters
accessor – the functions to access values to/from solution (see
Accessor
) (actually a 2-tuple with functions is ok)jsonp –
If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not
None
) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.Tip
If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.
See also
Consult also the example test-case in:
test/test_op.py:test_varargs()
, in the full sources of the project.Example:
>>> from graphtik import operation, compose, varargs
>>> def enlist(a, *b): ... return [a] + list(b)
>>> graph = compose('mygraph', ... operation(name='enlist', needs=['a', varargs('b')], ... provides='sum')(enlist) ... ) >>> graph Pipeline('mygraph', needs=['a', 'b'(?)], provides=['sum'], x1 ops: enlist)
The graph works with or without b in the inputs:
>>> graph(a=5, b=[2, 20])['sum'] [5, 2, 20] >>> graph(a=5) {'a': 5, 'sum': [5]} >>> graph(a=5, b=0xBAD) Traceback (most recent call last): ... ValueError: Failed preparing needs: 1. Expected needs['b'(+)] to be non-str iterables! +++inputs: ['a', 'b'] +++FnOp(name='enlist', needs=['a', 'b'(+)], provides=['sum'], fn='enlist')
Attention
To avoid user mistakes, varargs do not accept
str
inputs (though iterables):>>> graph(a=5, b="mistake") Traceback (most recent call last): ... ValueError: Failed preparing needs: 1. Expected needs['b'(+)] to be non-str iterables! +++inputs: ['a', 'b'] +++FnOp(name='enlist', needs=['a', 'b'(+)], provides=['sum'], fn='enlist')
See also
The elaborate example in Hierarchical data and further tricks section.
Aliased provides¶
Sometimes, you need to interface functions & operations where they name a
dependency differently.
This is doable without introducing “pipe-through” interface operation, either
by annotating certain needs with keyword()
modifier (above), or
by aliassing certain provides to different names:
>>> op = operation(str,
... name="`provides` with `aliases`",
... needs="anything",
... provides="real thing",
... aliases=("real thing", "phony"))
Default conveyor operation¶
If you don’t specify a callable, the default identity function get assigned, as long a name for the operation is given, and the number of needs matches the number of provides.
This facilitates conveying inputs into renamed outputs without the need to define a trivial identity function matching the needs & provides each time:
>>> from graphtik import keyword, optional, vararg
>>> op = operation(
... None,
... name="a",
... needs=[optional("opt"), vararg("vararg"), "pos", keyword("kw")],
... # positional vararg, keyword, optional
... provides=["pos", "vararg", "kw", "opt"],
... )
>>> op(opt=5, vararg=6, pos=7, kw=8)
{'pos': 7, 'vararg': 6, 'kw': 5, 'opt': 8}
Notice that the order of the results is not that of the needs
(or that of the inputs in the compute()
method), but, as explained in the comment-line,
it follows Python semantics.
Considerations for when building pipelines¶
When many operations are composed into a computation graph, Graphtik matches up the values in their needs and provides to form the edges of that graph (see Pipelines for more on that), like the operations from the script in Quick start:
>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> def abspow(a, p):
... """Compute |a|^p. """
... c = abs(a) ** p
... return c
>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> graphop = compose("graphop",
... operation(mul, needs=["a", "b"], provides=["ab"]),
... operation(sub, needs=["a", "ab"], provides=["a_minus_ab"]),
... operation(name="abspow1", needs=["a_minus_ab"], provides=["abs_a_minus_ab_cubed"])
... (partial(abspow, p=3))
... )
>>> graphop
Pipeline('graphop',
needs=['a', 'b', 'ab', 'a_minus_ab'],
provides=['ab', 'a_minus_ab', 'abs_a_minus_ab_cubed'],
x3 ops: mul, sub, abspow1)
Notice the use of
functools.partial()
to set parameterp
to a constant value.And this is done by calling once more the returned “decorator* from
operation()
, when called without a functions.
The needs
and provides
arguments to the operations in this script define
a computation graph that looks like this: