1. Operations¶
At a high level, an operation is a node in a computation graph.
Graphtik uses an Operation
class to abstractly represent these computations.
The class specifies the requirements for a function to participate
in a computation graph; those are its input-data needs, and the output-data
it provides.
The FunctionalOperation
provides a lightweight wrapper
around an arbitrary function to define those specifications.
-
class
graphtik.op.
FunctionalOperation
(fn: Callable, name, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, *, parents: Tuple = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None)[source] An operation performing a callable (ie a function, a method, a lambda).
-
real_provides
[source]¶ -
Value names the underlying function provides (without aliases, with(!) sideffects).
FIXME: real_provides not sure what it does with sideffects
Tip
Use
operation()
builder class to build instances of this class instead.-
__call__
(*args, **kwargs)[source] Call self as a function.
-
__init__
(fn: Callable, name, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, *, parents: Tuple = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None)[source] Build a new operation out of some function and its requirements.
Parameters: - name – a name for the operation (e.g. ‘conv1’, ‘sum’, etc..); it will be prefixed by parents.
- needs – Names of input data objects this operation requires.
- provides – Names of the real output values the underlying function provides (without aliases, with(!) sideffects)
- aliases – an optional mapping of real provides to additional ones, togetherher comprising this operations provides.
- parents – a tuple wth the names of the parents, prefixing name, but also kept for equality/hash check.
- rescheduled – If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.
- endured – If true, even if callable fails, solution will reschedule; ignored if endurance enabled globally.
- parallel – execute in parallel
- marshalled – If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).
- returns_dict – if true, it means the fn returns a dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)
- node_props – added as-is into NetworkX graph
-
compute
(named_inputs, outputs=None) → dict[source] Compute (optional) asked outputs for the given named_inputs.
It is called by
Network
. End-users should simply call the operation with named_inputs as kwargs.Parameters: named_inputs – the input values with which to feed the computation. Returns list: Should return a list values representing the results of running the feed-forward computation on inputs
.
-
The operation
builder factory¶
There is a better way to instantiate an FunctionalOperation
than simply constructing it:
use the operation
builder class:
-
class
graphtik.
operation
(fn: Callable = None, *, name=None, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None)[source]¶ A builder for graph-operations wrapping functions.
Parameters: - fn (function) – The function used by this operation. This does not need to be
specified when the operation object is instantiated and can instead
be set via
__call__
later. - name (str) – The name of the operation in the computation graph.
- needs – Names of input data objects this operation requires. These should
correspond to the
args
offn
. - provides – Names of output data objects this operation provides. If more than one given, those must be returned in an iterable, unless returns_dict is true, in which case a dictionary with as many elements must be returned
- aliases – an optional mapping of provides to additional ones
- rescheduled – If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.
- endured – If true, even if callable fails, solution will reschedule. ignored if endurance enabled globally.
- parallel – execute in parallel
- marshalled – If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).
- returns_dict – if true, it means the fn returns dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)
- node_props – added as-is into NetworkX graph
Returns: when called, it returns a
FunctionalOperation
Example:
This is an example of its use, based on the “builder pattern”:
>>> from graphtik import operation >>> opb = operation(name='add_op') >>> opb.withset(needs=['a', 'b']) operation(name='add_op', needs=['a', 'b'], provides=[], fn=None) >>> opb.withset(provides='SUM', fn=sum) operation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')
You may keep calling
withset()
till you invoke a final__call__()
on the builder; then you get the actualFunctionalOperation
instance:>>> # Create `Operation` and overwrite function at the last moment. >>> opb(sum) FunctionalOperation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')
Tip
Remember to call once more the builder class at the end, to get the actual operation instance.
-
__call__
(fn: Callable = None, *, name=None, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None) → graphtik.op.FunctionalOperation[source]¶ This enables
operation
to act as a decorator or as a functional operation, for example:@operator(name='myadd1', needs=['a', 'b'], provides=['c']) def myadd(a, b): return a + b
or:
def myadd(a, b): return a + b operator(name='myadd1', needs=['a', 'b'], provides=['c'])(myadd)
Parameters: fn (function) – The function to be used by this operation
.Returns: Returns an operation class that can be called as a function or composed into a computation graph.
-
withset
(*, fn: Callable = None, name=None, needs: Union[Collection[T_co], str, None] = None, provides: Union[Collection[T_co], str, None] = None, aliases: Mapping[KT, VT_co] = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping[KT, VT_co] = None) → graphtik.op.operation[source]¶ See
operation
for arguments here.
- fn (function) – The function used by this operation. This does not need to be
specified when the operation object is instantiated and can instead
be set via
Operations are just functions¶
At the heart of each operation
is just a function, any arbitrary function.
Indeed, you can instantiate an operation
with a function and then call it
just like the original function, e.g.:
>>> from operator import add
>>> from graphtik import operation
>>> add_op = operation(name='add_op', needs=['a', 'b'], provides=['a_plus_b'])(add)
>>> add_op(3, 4) == add(3, 4)
True
Specifying graph structure: provides
and needs
¶
Of course, each operation
is more than just a function.
It is a node in a computation graph, depending on other nodes in the graph for input data and
supplying output data that may be used by other nodes in the graph (or as a graph output).
This graph structure is specified via the provides
and needs
arguments
to the operation
constructor. Specifically:
provides
: this argument names the outputs (i.e. the returned values) of a givenoperation
. If multiple outputs are specified byprovides
, then the return value of the function comprising theoperation
must return an iterable.needs
: this argument names data that is needed as input by a givenoperation
. Each piece of data named in needs may either be provided by anotheroperation
in the same graph (i.e. specified in theprovides
argument of thatoperation
), or it may be specified as a named input to a graph computation (more on graph computations here).
When many operations are composed into a computation graph (see Graph Composition for more on that),
Graphtik matches up the values in their needs
and provides
to form the edges of that graph.
Let’s look again at the operations from the script in Quick start, for example:
>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> # Computes |a|^p.
>>> def abspow(a, p):
... c = abs(a) ** p
... return c
>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> graphop = compose("graphop",
... operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul),
... operation(name="sub1", needs=["a", "ab"], provides=["a_minus_ab"])(sub),
... operation(name="abspow1", needs=["a_minus_ab"], provides=["abs_a_minus_ab_cubed"])
... (partial(abspow, p=3))
... )
Tip
Notice the use of functools.partial()
to set parameter p
to a constant value.
The needs
and provides
arguments to the operations in this script define
a computation graph that looks like this (where the oval are operations,
squares/houses are data):
Tip
See Plotting on how to make diagrams like this.
Instantiating operations¶
There are several ways to instantiate an operation
, each of which might be more suitable for different scenarios.
Decorator specification¶
If you are defining your computation graph and the functions that comprise it all in the same script, the decorator specification of operation
instances might be particularly useful, as it allows you to assign computation graph structure to functions as they are defined. Here’s an example:
>>> from graphtik import operation, compose
>>> @operation(name='foo_op', needs=['a', 'b', 'c'], provides='foo')
... def foo(a, b, c):
... return c * (a + b)
>>> graphop = compose('foo_graph', foo)
Functional specification¶
If the functions underlying your computation graph operations are defined elsewhere than the script in which your graph itself is defined (e.g. they are defined in another module, or they are system functions), you can use the functional specification of operation
instances:
>>> from operator import add, mul
>>> from graphtik import operation, compose
>>> add_op = operation(name='add_op', needs=['a', 'b'], provides='sum')(add)
>>> mul_op = operation(name='mul_op', needs=['c', 'sum'], provides='product')(mul)
>>> graphop = compose('add_mul_graph', add_op, mul_op)
The functional specification is also useful if you want to create multiple operation
instances from the same function, perhaps with different parameter values, e.g.:
>>> from functools import partial
>>> def mypow(a, p=2):
... return a ** p
>>> pow_op1 = operation(name='pow_op1', needs=['a'], provides='a_squared')(mypow)
>>> pow_op2 = operation(name='pow_op2', needs=['a'], provides='a_cubed')(partial(mypow, p=3))
>>> graphop = compose('two_pows_graph', pow_op1, pow_op2)
A slightly different approach can be used here to accomplish the same effect by creating an operation “builder pattern”:
>>> def mypow(a, p=2):
... return a ** p
>>> pow_op_factory = operation(mypow, needs=['a'], provides='a_squared')
>>> pow_op1 = pow_op_factory(name='pow_op1')
>>> pow_op2 = pow_op_factory.withset(name='pow_op2', provides='a_cubed')(partial(mypow, p=3))
>>> pow_op3 = pow_op_factory(lambda a: 1, name='pow_op3')
>>> graphop = compose('two_pows_graph', pow_op1, pow_op2, pow_op3)
>>> graphop(a=2)
{'a': 2, 'a_squared': 4, 'a_cubed': 1}
Note
You cannot call again the factory to overwrite the function,
you have to use either the fn=
keyword with withset()
method or
call once more.
Modifiers on operation needs and provides¶
Modifiers change the behavior of specific needs or provides.
The needs and provides annotated with modifiers designate, for instance, optional function arguments, or “ghost” sideffects.
-
class
graphtik.modifiers.
arg
[source] Annotate a needs to map from its name in the inputs to a different argument-name.
Parameters: fn_arg – The argument-name corresponding to this named-input.
Note
This extra mapping argument is needed either for optionals or for functions with keywords-only arguments (like
def func(*, foo, bar): ...
), since inputs` are normally fed into functions by-position, not by-name.Example:
In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of
arg
(oroptional
):>>> from graphtik import operation, compose, arg
>>> def myadd(a, *, b): ... return a + b
>>> graph = compose('mygraph', ... operation(name='myadd', ... needs=['a', arg("name-in-inputs", "b")], ... provides="sum")(myadd) ... ) >>> graph NetworkOperation('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops: +--FunctionalOperation(name='myadd', needs=['a', arg('name-in-inputs'-->'b')], provides=['sum'], fn='myadd')) >>> graph.compute({"a": 5, "name-in-inputs": 4})['sum'] 9
-
class
graphtik.modifiers.
optional
[source] Annotate optionals needs corresponding to defaulted op-function arguments, …
received only if present in the inputs (when operation is invocated). The value of an optional is passed as a keyword argument to the underlying function.
Example:
>>> from graphtik import operation, compose, optional
>>> def myadd(a, b=0): ... return a + b
Annotate
b
as optional argument (and notice it’s default value0
):>>> graph = compose('mygraph', ... operation(name='myadd', ... needs=["a", optional("b")], ... provides="sum")(myadd) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('b')], provides=['sum'], x1 ops: ...
The graph works both with and without
c
provided in the inputs:>>> graph(a=5, b=4)['sum'] 9 >>> graph(a=5) {'a': 5, 'sum': 5}
-
class
graphtik.modifiers.
sideffect
[source] sideffects dependencies participates in the graph but not exchanged with functions.
Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (compilation) but they do not interact with the operation’s function; specifically:
- input sideffects must exist in the inputs for an operation to kick-in;
- input sideffects are NOT fed into the function;
- output sideffects are NOT expected from the function;
- output sideffects are stored in the solution.
Their purpose is to describe operations that modify the internal state of some of their arguments (“side-effects”).
Example:
A typical use-case is to signify columns required to produce new ones in pandas dataframes:
>>> from graphtik import operation, compose, sideffect
>>> # Function appending a new dataframe column from two pre-existing ones. >>> def addcolumns(df): ... df['sum'] = df['a'] + df['b']
Designate
a
,b
&sum
column names as an sideffect arguments:>>> graph = compose('mygraph', ... operation( ... name='addcolumns', ... needs=['df', sideffect('df.b')], # sideffect names can be anything ... provides=[sideffect('df.sum')])(addcolumns) ... ) >>> graph NetworkOperation('mygraph', needs=['df', 'sideffect(df.b)'], provides=['sideffect(df.sum)'], x1 ops: +--FunctionalOperation(name='addcolumns', needs=['df', 'sideffect(df.b)'], provides=['sideffect(df.sum)'], fn='addcolumns'))
>>> df = pd.DataFrame({'a': [5, 0], 'b': [2, 1]}) # doctest: +SKIP >>> graph({'df': df})['df'] # doctest: +SKIP a b 0 5 2 1 0 1
We didn’t get the
sum
column because theb
sideffect was unsatisfied. We have to add its key to the inputs (with any value):>>> graph({'df': df, sideffect("df.b"): 0})['df'] # doctest: +SKIP a b sum 0 5 2 7 1 0 1 1
Note that regular data in needs and provides do not match same-named sideffects. That is, in the following operation, the
prices
input is different from thesideffect(prices)
output:>>> def upd_prices(sales_df, prices): ... sales_df["Prices"] = prices
>>> operation(fn=upd_prices, ... name="upd_prices", ... needs=["sales_df", "price"], ... provides=[sideffect("price")]) operation(name='upd_prices', needs=['sales_df', 'price'], provides=['sideffect(price)'], fn='upd_prices')
Note
An operation with sideffects outputs only, have functions that return no value at all (like the one above). Such operation would still be called for their side-effects, if requested in outputs.
Tip
You may associate sideffects with other data to convey their relationships, simply by including their names in the string - in the end, it’s just a string - but no enforcement will happen from graphtik, like:
>>> sideffect("price[sales_df]") 'sideffect(price[sales_df])'
-
class
graphtik.modifiers.
vararg
[source] Annotate optionals needs to be fed as op-function’s
*args
when present in inputs.See also
Consult also the example test-case in:
test/test_op.py:test_varargs()
, in the full sources of the project.Example:
>>> from graphtik import operation, compose, vararg
>>> def addall(a, *b): ... return a + sum(b)
Designate
b
&c
as an vararg arguments:>>> graph = compose( ... 'mygraph', ... operation( ... name='addall', ... needs=['a', vararg('b'), vararg('c')], ... provides='sum' ... )(addall) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('b'), optional('c')], provides=['sum'], x1 ops: +--FunctionalOperation(name='addall', needs=['a', vararg('b'), vararg('c')], provides=['sum'], fn='addall'))
The graph works with and without any of
b
orc
inputs:>>> graph(a=5, b=2, c=4)['sum'] 11 >>> graph(a=5, b=2) {'a': 5, 'b': 2, 'sum': 7} >>> graph(a=5) {'a': 5, 'sum': 5}
-
class
graphtik.modifiers.
varargs
[source] Like
vararg
, naming an optional iterable value in the inputs.See also
Consult also the example test-case in:
test/test_op.py:test_varargs()
, in the full sources of the project.Example:
>>> from graphtik import operation, compose, vararg
>>> def enlist(a, *b): ... return [a] + list(b)
>>> graph = compose('mygraph', ... operation(name='enlist', needs=['a', varargs('b')], ... provides='sum')(enlist) ... ) >>> graph NetworkOperation('mygraph', needs=['a', optional('b')], provides=['sum'], x1 ops: +--FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist'))
The graph works with or without b in the inputs:
>>> graph(a=5, b=[2, 20])['sum'] [5, 2, 20] >>> graph(a=5) {'a': 5, 'sum': [5]} >>> graph(a=5, b=0xBAD) Traceback (most recent call last): ... graphtik.base.MultiValueError: Failed preparing needs: 1. Expected needs[varargs('b')] to be non-str iterables! +++inputs: {'a': 5, 'b': 2989} +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')
Attention
To avoid user mistakes, it does not accept strings (though iterables):
>>> graph(a=5, b="mistake") Traceback (most recent call last): ... graphtik.base.MultiValueError: Failed preparing needs: 1. Expected needs[varargs('b')] to be non-str iterables! +++inputs: {'a': 5, 'b': 'mistake'} +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')