1. Operations

At a high level, an operation is a node in a computation graph. Graphtik uses an Operation class to abstractly represent these computations. The class specifies the requirements for a function to participate in a computation graph; those are its input-data needs, and the output-data it provides.

The FunctionalOperation provides a lightweight wrapper around an arbitrary function to define those specifications.

class graphtik.op.FunctionalOperation(fn: Callable, name, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, *, parents: Tuple = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

An operation performing a callable (ie a function, a method, a lambda).

Tip

Use operation() builder class to build instances of this class instead.

__call__(*args, **kwargs)[source]

Call self as a function.

__init__(fn: Callable, name, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, *, parents: Tuple = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

Build a new operation out of some function and its requirements.

See operation for the full documentation of parameters.

name[source]

a name for the operation (e.g. ‘conv1’, ‘sum’, etc..); it will be prefixed by parents.

needs[source]

Names of input data objects this operation requires.

provides[source]

Names of the real output values the underlying function provides (without aliases, with(!) sideffects)

NOTE that the instance attribute eventually includes aliases & sideffects.

real_provides[source]

Value names the underlying function provides (without aliases, with(!) sideffects).

FIXME: real_provides not sure what it does with sideffects.

aliases[source]

an optional mapping of real provides to additional ones, together comprising this operations provides.

parents[source]

a tuple wth the names of the parents, prefixing name, but also kept for equality/hash check.

rescheduled[source]

If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.

endured[source]

If true, even if callable fails, solution will reschedule; ignored if endurance enabled globally.

parallel[source]

execute in parallel

marshalled[source]

If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).

returns_dict[source]

if true, it means the fn returns a dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)

node_props[source]

Added as-is into NetworkX graph, and you may filter operations by NetworkOperation.withset(). Also plot-rendering affected if they match Graphviz properties, unless they start with underscore(_).

compute(named_inputs, outputs=None)dict[source]

Compute (optional) asked outputs for the given named_inputs.

It is called by Network. End-users should simply call the operation with named_inputs as kwargs.

Parameters

named_inputs – the input values with which to feed the computation.

Returns list

Should return a list values representing the results of running the feed-forward computation on inputs.

The operation builder factory

There is a better way to instantiate an FunctionalOperation than simply constructing it: use the operation builder class:

class graphtik.operation(fn: Callable = None, *, name=None, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)[source]

A builder for graph-operations wrapping functions.

Parameters
  • fn – The callable underlying this operation. This does not need to be specified when the operation object is instantiated and can instead be set via __call__ later.

  • name (str) – The name of the operation in the computation graph.

  • needs

    The list of (positionally ordered) names of the data needed by the operation to receive as inputs, roughly corresponding to the arguments of the underlying fn.

    See also needs & modifiers.

  • provides

    Names of output data this operation provides, which must correspond to the returned values of the fn. If more than one given, those must be returned in an iterable, unless returns_dict is true, in which case a dictionary with (at least) as many elements must be returned.

    See also provides & modifiers.

  • aliases – an optional mapping of provides to additional ones

  • rescheduled – If true, underlying callable may produce a subset of provides, and the plan must then reschedule after the operation has executed. In that case, it makes more sense for the callable to returns_dict.

  • endured – If true, even if callable fails, solution will reschedule. ignored if endurance enabled globally.

  • parallel – execute in parallel

  • marshalled – If true, operation will be marshalled while computed, along with its inputs & outputs. (usefull when run in parallel with a process pool).

  • returns_dict – if true, it means the fn returns dictionary with all provides, and no further processing is done on them (i.e. the returned output-values are not zipped with provides)

  • node_props – Added as-is into NetworkX graph, and you may filter operations by NetworkOperation.withset(). Also plot-rendering affected if they match Graphviz properties., unless they start with underscore(_)

Returns

when called, it returns a FunctionalOperation

Example:

This is an example of its use, based on the “builder pattern”:

>>> from graphtik import operation
>>> opb = operation(name='add_op')
>>> opb.withset(needs=['a', 'b'])
operation(name='add_op', needs=['a', 'b'], provides=[], fn=None)
>>> opb.withset(provides='SUM', fn=sum)
operation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')

You may keep calling withset() till you invoke a final __call__() on the builder; then you get the actual FunctionalOperation instance:

>>> # Create `Operation` and overwrite function at the last moment.
>>> opb(sum)
FunctionalOperation(name='add_op', needs=['a', 'b'], provides=['SUM'], fn='sum')

Tip

Remember to call once more the builder class at the end, to get the actual operation instance.

__call__(fn: Callable = None, *, name=None, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)graphtik.op.FunctionalOperation[source]

This enables operation to act as a decorator or as a functional operation, for example:

@operator(name='myadd1', needs=['a', 'b'], provides=['c'])
def myadd(a, b):
    return a + b

or:

def myadd(a, b):
    return a + b
operator(name='myadd1', needs=['a', 'b'], provides=['c'])(myadd)
Parameters

fn – The function to be used by this operation.

Returns

Returns an operation class that can be called as a function or composed into a computation graph.

withset(*, fn: Callable = None, name=None, needs: Union[Collection, str, None] = None, provides: Union[Collection, str, None] = None, aliases: Mapping = None, rescheduled=None, endured=None, parallel=None, marshalled=None, returns_dict=None, node_props: Mapping = None)graphtik.op.operation[source]

See operation for arguments here.

Operations are just functions

At the heart of each operation is just a function, any arbitrary function. Indeed, you can instantiate an operation with a function and then call it just like the original function, e.g.:

>>> from operator import add
>>> from graphtik import operation
>>> add_op = operation(name='add_op', needs=['a', 'b'], provides=['a_plus_b'])(add)
>>> add_op(3, 4) == add(3, 4)
True

Specifying graph structure: provides and needs

Of course, each operation is more than just a function. It is a node in a computation graph, depending on other nodes in the graph for input data and supplying output data that may be used by other nodes in the graph (or as a graph output). This graph structure is specified via the provides and needs arguments to the operation constructor. Specifically:

  • provides: this argument names the outputs (i.e. the returned values) of a given operation. If multiple outputs are specified by provides, then the return value of the function comprising the operation must return an iterable.

  • needs: this argument names data that is needed as input by a given operation. Each piece of data named in needs may either be provided by another operation in the same graph (i.e. specified in the provides argument of that operation), or it may be specified as a named input to a graph computation (more on graph computations here).

When many operations are composed into a computation graph (see Graph Composition for more on that), Graphtik matches up the values in their needs and provides to form the edges of that graph.

Let’s look again at the operations from the script in Quick start, for example:

>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> # Computes |a|^p.
>>> def abspow(a, p):
...   c = abs(a) ** p
...   return c
>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> graphop = compose("graphop",
...    operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul),
...    operation(name="sub1", needs=["a", "ab"], provides=["a_minus_ab"])(sub),
...    operation(name="abspow1", needs=["a_minus_ab"], provides=["abs_a_minus_ab_cubed"])
...    (partial(abspow, p=3))
... )

Tip

Notice the use of functools.partial() to set parameter p to a constant value.

The needs and provides arguments to the operations in this script define a computation graph that looks like this (where the oval are operations, squares/houses are data):

Tip

See Plotting on how to make diagrams like this.

Instantiating operations

There are several ways to instantiate an operation, each of which might be more suitable for different scenarios.

Decorator specification

If you are defining your computation graph and the functions that comprise it all in the same script, the decorator specification of operation instances might be particularly useful, as it allows you to assign computation graph structure to functions as they are defined. Here’s an example:

>>> from graphtik import operation, compose
>>> @operation(name='foo_op', needs=['a', 'b', 'c'], provides='foo')
... def foo(a, b, c):
...   return c * (a + b)
>>> graphop = compose('foo_graph', foo)

Functional specification

If the functions underlying your computation graph operations are defined elsewhere than the script in which your graph itself is defined (e.g. they are defined in another module, or they are system functions), you can use the functional specification of operation instances:

>>> from operator import add, mul
>>> from graphtik import operation, compose
>>> add_op = operation(name='add_op', needs=['a', 'b'], provides='sum')(add)
>>> mul_op = operation(name='mul_op', needs=['c', 'sum'], provides='product')(mul)
>>> graphop = compose('add_mul_graph', add_op, mul_op)

The functional specification is also useful if you want to create multiple operation instances from the same function, perhaps with different parameter values, e.g.:

>>> from functools import partial
>>> def mypow(a, p=2):
...    return a ** p
>>> pow_op1 = operation(name='pow_op1', needs=['a'], provides='a_squared')(mypow)
>>> pow_op2 = operation(name='pow_op2', needs=['a'], provides='a_cubed')(partial(mypow, p=3))
>>> graphop = compose('two_pows_graph', pow_op1, pow_op2)

A slightly different approach can be used here to accomplish the same effect by creating an operation “builder pattern”:

>>> def mypow(a, p=2):
...    return a ** p
>>> pow_op_factory = operation(mypow, needs=['a'], provides='a_squared')
>>> pow_op1 = pow_op_factory(name='pow_op1')
>>> pow_op2 = pow_op_factory.withset(name='pow_op2', provides='a_cubed')(partial(mypow, p=3))
>>> pow_op3 = pow_op_factory(lambda a: 1, name='pow_op3')
>>> graphop = compose('two_pows_graph', pow_op1, pow_op2, pow_op3)
>>> graphop(a=2)
{'a': 2, 'a_squared': 4, 'a_cubed': 1}

Note

You cannot call again the factory to overwrite the function, you have to use either the fn= keyword with withset() method or call once more.

Modifiers on operation needs and provides

modifiers change the behavior of specific needs or provides.

The needs and provides annotated with modifiers designate, for instance, optional function arguments, or “ghost” sideffects.

class graphtik.modifiers.arg[source]

Annotate a needs to map from its name in the inputs to a different argument-name.

Parameters

fn_arg

The argument-name corresponding to this named-input.

Note

This extra mapping argument is needed either for optionals or for functions with keywords-only arguments (like def func(*, foo, bar): ...), since inputs` are normally fed into functions by-position, not by-name.

Example:

In case the name of the function arguments is different from the name in the inputs (or just because the name in the inputs is not a valid argument-name), you may map it with the 2nd argument of arg (or optional):

>>> from graphtik import operation, compose, arg, debug
>>> def myadd(a, *, b):
...    return a + b
>>> graph = compose('mygraph',
...     operation(name='myadd',
...               needs=['a', arg("name-in-inputs", "b")],
...               provides="sum")(myadd)
... )
>>> with debug(True):
...     graph
NetworkOperation('mygraph', needs=['a', 'name-in-inputs'], provides=['sum'], x1 ops:
  +--FunctionalOperation(name='myadd',
                         needs=['a',
                         arg('name-in-inputs'-->'b')],
                         provides=['sum'],
                         fn='myadd'))
>>> graph.compute({"a": 5, "name-in-inputs": 4})['sum']
9
class graphtik.modifiers.optional[source]

Annotate optionals needs corresponding to defaulted op-function arguments, …

received only if present in the inputs (when operation is invoked). The value of an optional is passed as a keyword argument to the underlying function.

Example:

>>> from graphtik import operation, compose, optional
>>> def myadd(a, b=0):
...    return a + b

Annotate b as optional argument (and notice it’s default value 0):

>>> graph = compose('mygraph',
...     operation(name='myadd',
...               needs=["a", optional("b")],
...               provides="sum")(myadd)
... )
>>> graph
NetworkOperation('mygraph',
                 needs=['a', optional('b')],
                 provides=['sum'],
                 x1 ops: myadd)

The graph works both with and without c provided in the inputs:

>>> graph(a=5, b=4)['sum']
9
>>> graph(a=5)
{'a': 5, 'sum': 5}

Like arg you may map input-name to a different function-argument:

>>> from graphtik import debug
>>> graph = compose('mygraph',
...     operation(name='myadd',
...               needs=['a', optional("quasi-real", "b")],
...               provides="sum")(myadd)
... )
>>> with debug(True):
...     graph
NetworkOperation('mygraph', needs=['a', optional('quasi-real')], provides=['sum'], x1 ops:
  +--FunctionalOperation(name='myadd', needs=['a', optional('quasi-real'-->'b')], provides=['sum'], fn='myadd'))
>>> graph.compute({"a": 5, "quasi-real": 4})['sum']
9
class graphtik.modifiers.sideffect[source]

sideffects dependencies participates in the graph but not exchanged with functions.

Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (compilation) but they do not interact with the operation’s function; specifically:

  • input sideffects must exist in the inputs for an operation to kick-in;

  • input sideffects are NOT fed into the function;

  • output sideffects are NOT expected from the function;

  • output sideffects are stored in the solution.

Their purpose is to describe operations that modify the internal state of some of their arguments (“side-effects”).

Example:

A typical use-case is to signify columns required to produce new ones in pandas dataframes:

>>> from graphtik import operation, compose, sideffect
>>> # Function appending a new dataframe column from two pre-existing ones.
>>> def addcolumns(df):
...    df['sum'] = df['a'] + df['b']

Designate a, b & sum column names as an sideffect arguments:

>>> graph = compose('mygraph',
...     operation(
...         name='addcolumns',
...         needs=['df', sideffect('df.b')],  # sideffect names can be anything
...         provides=[sideffect('df.sum')])(addcolumns)
... )
>>> graph
NetworkOperation('mygraph', needs=['df', 'sideffect(df.b)'],
                 provides=['sideffect(df.sum)'], x1 ops: addcolumns)
>>> df = pd.DataFrame({'a': [5, 0], 'b': [2, 1]})   
>>> graph({'df': df})['df']                         
        a       b
0       5       2
1       0       1

We didn’t get the sum column because the b sideffect was unsatisfied. We have to add its key to the inputs (with any value):

>>> graph({'df': df, sideffect("df.b"): 0})['df']   # doctest: +SKIP
        a       b       sum
0       5       2       7
1       0       1       1

Note that regular data in needs and provides do not match same-named sideffects. That is, in the following operation, the prices input is different from the sideffect(prices) output:

>>> def upd_prices(sales_df, prices):
...     sales_df["Prices"] = prices
>>> operation(fn=upd_prices,
...           name="upd_prices",
...           needs=["sales_df", "price"],
...           provides=[sideffect("price")])
operation(name='upd_prices', needs=['sales_df', 'price'],
          provides=['sideffect(price)'], fn='upd_prices')

Note

An operation with sideffects outputs only, have functions that return no value at all (like the one above). Such operation would still be called for their side-effects, if requested in outputs.

Tip

You may associate sideffects with other data to convey their relationships, simply by including their names in the string - in the end, it’s just a string - but no enforcement will happen from graphtik, like:

>>> sideffect("price[sales_df]")
'sideffect(price[sales_df])'
class graphtik.modifiers.vararg[source]

Annotate optionals needs to be fed as op-function’s *args when present in inputs.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

>>> from graphtik import operation, compose, vararg, debug
>>> def addall(a, *b):
...    return a + sum(b)

Designate b & c as an vararg arguments:

>>> graph = compose(
...     'mygraph',
...     operation(
...               name='addall',
...               needs=['a', vararg('b'), vararg('c')],
...               provides='sum'
...     )(addall)
... )
>>> with debug(True):
...     graph
NetworkOperation('mygraph',
                 needs=['a', optional('b'), optional('c')],
                 provides=['sum'],
                 x1 ops:
  +--FunctionalOperation(name='addall', needs=['a', vararg('b'), vararg('c')], provides=['sum'], fn='addall'))

The graph works with and without any of b or c inputs:

>>> graph(a=5, b=2, c=4)['sum']
11
>>> graph(a=5, b=2)
{'a': 5, 'b': 2, 'sum': 7}
>>> graph(a=5)
{'a': 5, 'sum': 5}
class graphtik.modifiers.varargs[source]

Like vararg, naming an optional iterable value in the inputs.

See also

Consult also the example test-case in: test/test_op.py:test_varargs(), in the full sources of the project.

Example:

>>> from graphtik import operation, compose, varargs
>>> def enlist(a, *b):
...    return [a] + list(b)
>>> graph = compose('mygraph',
...     operation(name='enlist', needs=['a', varargs('b')],
...     provides='sum')(enlist)
... )
>>> graph
NetworkOperation('mygraph',
                 needs=['a', optional('b')],
                 provides=['sum'],
                 x1 ops: enlist)

The graph works with or without b in the inputs:

>>> graph(a=5, b=[2, 20])['sum']
[5, 2, 20]
>>> graph(a=5)
{'a': 5, 'sum': [5]}
>>> graph(a=5, b=0xBAD)
Traceback (most recent call last):
...
graphtik.base.MultiValueError: Failed preparing needs:
    1. Expected needs[varargs('b')] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')

Attention

To avoid user mistakes, varargs does not accept strings (though iterables):

>>> graph(a=5, b="mistake")
Traceback (most recent call last):
...
graphtik.base.MultiValueError: Failed preparing needs:
    1. Expected needs[varargs('b')] to be non-str iterables!
    +++inputs: ['a', 'b']
    +++FunctionalOperation(name='enlist', needs=['a', varargs('b')], provides=['sum'], fn='enlist')