2. Pipelines

Graphtik’s operation.compose() factory handles the work of tying together operation instances into a runnable computation graph.

The simplest use case is to assemble a collection of individual operations into a runnable computation graph. The example script from Quick start illustrates this well:

>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> def abspow(a, p):
...    """Computes |a|^p. """
...    c = abs(a) ** p
...    return c

The call here to compose() yields a runnable computation graph that looks like this (where the circles are operations, squares are data, and octagons are parameters):

>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> graphop = compose("graphop",
...    operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul),
...    operation(name="sub1", needs=["a", "ab"], provides=["a_minus_ab"])(sub),
...    operation(name="abspow1", needs=["a_minus_ab"], provides=["abs_a_minus_ab_cubed"])
...    (partial(abspow, p=3))
... )

This yields a graph which looks like this (see Plotting):

>>> graphop.plot('calc_power.svg')  

graphop

Running a pipeline

The graph composed above can be run by simply calling it with a dictionary of values with keys corresponding to the named dependencies (needs & provides):

>>> # Run the graph and request all of the outputs.
>>> out = graphop(a=2, b=5)
>>> out
{'a': 2, 'b': 5, 'ab': 10, 'a_minus_ab': -8, 'abs_a_minus_ab_cubed': 512}

You may plot the solution:

>>> out.plot('a_solution.svg')  

the solution of the graph

Producing a subset of outputs

By default, calling a graph-operation on a set of inputs will yield all of that graph’s outputs. You can use the outputs parameter to request only a subset. For example, if graphop is as above:

>>> # Run the graph-operation and request a subset of the outputs.
>>> out = graphop.compute({'a': 2, 'b': 5}, outputs="a_minus_ab")
>>> out
{'a_minus_ab': -8}

When asking a subset of the graph’s outputs, Graphtik does 2 things:

  • it prunes any operations that are not on the path from given inputs to the requested outputs (e.g. the abspow1 operation, above, is not executed);

  • it evicts any intermediate data from solution as soon as they are not needed.

You may see (2) in action by including the sequence of execution steps into the plot:

>>> from graphtik.plot import Theme

>>> dot = out.plot(theme=Theme(include_steps=True))

Short-circuiting a pipeline

You can short-circuit a graph computation, making certain inputs unnecessary, by providing a value in the graph that is further downstream in the graph than those inputs. For example, in the graph-operation we’ve been working with, you could provide the value of a_minus_ab to make the inputs a and b unnecessary:

>>> # Run the graph-operation and request a subset of the outputs.
>>> out = graphop(a_minus_ab=-8)
>>> out
{'a_minus_ab': -8, 'abs_a_minus_ab_cubed': 512}

When you do this, any operation nodes that are not on a path from the downstream input to the requested outputs (i.e. predecessors of the downstream input) are not computed. For example, the mul1 and sub1 operations are not executed here.

This can be useful if you have a graph-operation that accepts alternative forms of the same input. For example, if your graph-operation requires a PIL.Image as input, you could allow your graph to be run in an API server by adding an earlier operation that accepts as input a string of raw image data and converts that data into the needed PIL.Image. Then, you can either provide the raw image data string as input, or you can provide the PIL.Image if you have it and skip providing the image data string.

Extending pipelines

Sometimes we have existing computation graph(s) to which we want to append operations or other pipelines.

Combining

This is simple, since compose() can combine whole pipelines along with individual operations and pipelines.

For example, if we have the above graph, we can add another operation to it and create a new graph:

>>> # Add another subtraction operation to the graph.
>>> bigger_graph = compose("bigger_graph",
...    graphop,
...    operation(name="sub2", needs=["a_minus_ab", "c"], provides="a_minus_ab_minus_c")(sub)
... )

Notice that the original pipeline is preserved intact in an “isolated” cluster, and its operations have been prefixed by the name of that pipeline.

Run the graph and print the output:

>>> sol = bigger_graph.compute({'a': 2, 'b': 5, 'c': 5},
...                            outputs=["a_minus_ab_minus_c"])
>>> sol
{'a_minus_ab_minus_c': -13}
>>> dot = sol.plot(clusters=True)

Tip

We had to plot with clusters=True so that we prevent the plan to insert the “after pruning” cluster (see PlotArgs.clusters).

Merging

Sometimes we have computation graphs – perhaps ones that share operations – and we want to merge them into one.

This is doable with compose(..., merge=True)). Any identically-named operations are consolidate into a single node, where the operation added later in the call (further to the right) wins.

For example, let’s say we have graphop, as in the examples above, along with this graph:

>>> another_graph = compose("another_graph",
...    operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul),
...    operation(name="mul2", needs=["c", "ab"], provides=["cab"])(mul)
... )
>>> another_graph
NetworkOperation('another_graph', needs=['a', 'b', 'c', 'ab'], provides=['ab', 'cab'], x2 ops: mul1, mul2)

another_graph

We can merge graphop and another_graph like so, avoiding a redundant mul1 operation:

Note

The names of the graphs must differ.

>>> merged_graph = compose("merged_graph", graphop, another_graph, merge=True)
>>> print(merged_graph)
NetworkOperation('merged_graph',
                  needs=['a', 'b', 'ab', 'a_minus_ab', 'c'],
                  provides=['ab', 'a_minus_ab', 'abs_a_minus_ab_cubed', 'cab'],
                  x4 ops:  mul1, sub1, abspow1, mul2)

As always, we can run computations with this graph by simply calling it:

>>> sol = merged_graph.compute({"a": 2, "b": 5, "c": 5}, outputs=["cab"])
>>> sol
{'cab': 50}

See also

Consult these test-cases from the full sources of the project:

  • test/test_graphtik.py:test_network_simple_merge()

  • test/test_graphtik.py:test_network_deep_merge()

Advanced pipelines

Depending on sideffects

graphtik.modifiers.sideffect(name, optional: bool = None)[source]

sideffects denoting modifications beyond the scope of the solution.

Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (compilation) but they have a limited interaction with the operation’s underlying function; specifically:

  • input sideffects must exist in the solution as inputs for an operation depending on it to kick-in, when the computation starts - but this is not necessary for intermediate sideffects in the solution during execution;

  • input sideffects are NOT fed into underlying functions;

  • output sideffects are not expected from underlying functions, unless a rescheduled operation with partial outputs designates a sideffected as canceled by returning it with a falsy value (operation must returns dictionary).

Hint

If modifications involve some input/output, prefer the sideffected() modifier.

You may still convey this relationships by including the dependency name in the string - in the end, it’s just a string - but no enforcement of any kind will happen from graphtik, like:

>>> from graphtik import sideffect
>>> sideffect("price[sales_df]")
sideffect: 'price[sales_df]'

Example:

A typical use-case is to signify changes in some “global” context, outside solution:

>>> from graphtik import operation, compose, sideffect
>>> @operation(provides=sideffect("lights off"))  # sideffect names can be anything
... def close_the_lights():
...    pass
>>> graph = compose('strip ease',
...     close_the_lights,
...     operation(
...         name='undress',
...         needs=[sideffect("lights off")],
...         provides="body")(lambda: "TaDa!")
... )
>>> graph
NetworkOperation('strip ease', needs=[sideffect: 'lights off'],
                 provides=[sideffect: 'lights off', 'body'],
                 x2 ops: close_the_lights, undress)
>>> sol = graph()
>>> sol
{'body': 'TaDa!'}

sideffect

Note

Something has to provide a sideffect for a function needing it to execute - this could be another operation, like above, or the user-inputs; just specify some dummy value for the sideffect:

>>> sol = graph.compute({sideffect("lights off"): True})

Modifying existing values in solutions

graphtik.modifiers.sideffected(dependency: str, sideffect0: str, *sideffects: str, optional: bool = None, fn_kwarg: str = None)[source]

Annotates a sideffected dependency in the solution sustaining side-effects.

Like sideffect() but annotating a real dependency in the solution, allowing that dependency to be present both in needs and provides of the same function.

Example:

A typical use-case is to signify columns required to produce new ones in pandas dataframes (emulated with dictionaries):

>>> from graphtik import operation, compose, sideffected
>>> @operation(needs="order_items",
...            provides=sideffected("ORDER", "Items", "Prices"))
... def new_order(items: list) -> "pd.DataFrame":
...     order = {"items": items}
...     # Pretend we get the prices from sales.
...     order['prices'] = list(range(1, len(order['items']) + 1))
...     return order
>>> @operation(
...     needs=[sideffected("ORDER", "Items"), "vat rate"],
...     provides=sideffected("ORDER", "VAT")
... )
... def fill_in_vat(order: "pd.DataFrame", vat: float):
...     order['VAT'] = [i * vat for i in order['prices']]
...     return order
>>> @operation(
...     needs=[sideffected("ORDER", "Prices", "VAT")],
...     provides=sideffected("ORDER", "Totals")
... )
... def finalize_prices(order: "pd.DataFrame"):
...     order['totals'] = [p + v for p, v in zip(order['prices'], order['VAT'])]
...     return order

To view all internal dependencies, enable DEBUG in configurations:

>>> from graphtik import debug_enabled
>>> with debug_enabled(True):
...     finalize_prices
FunctionalOperation(name='finalize_prices',
                    needs=[sideffected('ORDER'<--'Prices'),
                           sideffected('ORDER'<--'VAT')],
                    op_needs=[sideffected('ORDER'<--'Prices'),
                              sideffected('ORDER'<--'VAT')],
                    fn_needs=['ORDER'],
                    provides=[sideffected('ORDER'<--'Totals')],
                    op_provides=[sideffected('ORDER'<--'Totals')],
                    fn_provides=['ORDER'], fn='finalize_prices')

Notice that declaring a single sideffected with multiple sideffects, expands into multiple “singular” sideffected dependencies in the network (check needs & op_needs above).

>>> proc_order = compose('process order', new_order, fill_in_vat, finalize_prices)
>>> sol = proc_order.compute({
...      "order_items": ["toilet-paper", "soap"],
...      "vat rate": 0.18,
... })
>>> sol
{'order_items': ['toilet-paper', 'soap'],
 'vat rate': 0.18,
 'ORDER': {'items': ['toilet-paper', 'soap'],
           'prices': [1, 2],
           'VAT': [0.18, 0.36],
           'totals': [1.18, 2.36]}}

sideffecteds

Notice that although many functions consume & produce the same ORDER dependency (check fn_needs & fn_provides, above), something that would have formed cycles, the wrapping operations need and provide different sideffected instances, breaking the cycles.

Resilience when operations fail (endurance)

It is possible for a pipeline to persist executing operations, even if some of them are raising errors, if they are marked as endured:

>>> @operation(endured=1, provides=["space", "time"])
... def get_out():
...     raise ValueError("Quarantined!")
>>> get_out
FunctionalOperation!(name='get_out', needs=[], provides=['space', 'time'], fn='get_out')

Notice the exclamation(!) before the parenthesis in the string representation of the operation.

>>> @operation(needs="space", provides="fun")
... def exercise(where):
...     return "refreshed"
>>> @operation(endured=1, provides="time")
... def stay_home():
...     return "1h"
>>> @operation(needs="time", provides="fun")
... def read_book(for_how_long):
...     return "relaxed"
>>> netop = compose("covid19", get_out, stay_home, exercise, read_book)
>>> netop
NetworkOperation('covid19',
                 needs=['space', 'time'],
                 provides=['space', 'time', 'fun'],
                 x4 ops: get_out, stay_home, exercise, read_book)

Notice the thick outlines of the endured (or rescheduled, see below) operations.

When executed, the pipeline produced outputs, although one of its operations has failed:

>>> sol = netop()
>>> sol
{'time': '1h', 'fun': 'relaxed'}

You may still abort on failures, later, by raising an appropriate exception from Solution:

>>> sol.scream_if_incomplete()
Traceback (most recent call last):
...
graphtik.network.IncompleteExecutionError:
Not completed x2 operations ['exercise', 'get_out'] due to x1 failures and x0 partial-ops:
  +--get_out: ValueError('Quarantined!')

Operations with partial outputs (rescheduled)

In case the actually produce outputs depend on some condition in the inputs, the solution has to reschedule the plan amidst execution, and consider the actual provides delivered:

>>> @operation(rescheduled=1,
...            needs="quarantine",
...            provides=["space", "time"],
...            returns_dict=True)
... def get_out_or_stay_home(quarantine):
...     if quarantine:
...          return {"time": "1h"}
...     else:
...          return {"space": "around the block"}
>>> get_out_or_stay_home
FunctionalOperation?(name='get_out_or_stay_home',
                     needs=['quarantine'],
                     provides=['space', 'time'],
                     fn{}='get_out_or_stay_home')
>>> @operation(needs="space", provides=["fun", "body"])
... def exercise(where):
...     return "refreshed", "strong feet"
>>> @operation(needs="time", provides=["fun", "brain"])
... def read_book(for_how_long):
...     return "relaxed", "popular physics"
>>> netop = compose("covid19", get_out_or_stay_home, exercise, read_book)

Depending on “quarantine’ state we get to execute different part of the pipeline:

>>> sol = netop(quarantine=True)
>>> sol = netop(quarantine=False)

In both case, a warning gets raised about the missing outputs, but the execution proceeds regularly to what it is possible to evaluate. You may collect a report of what has been canceled using this:

>>> print(sol.check_if_incomplete())
Not completed x1 operations ['read_book'] due to x0 failures and x1 partial-ops:
  +--get_out_or_stay_home: ['time']

In case you wish to cancel the output of a single-result operation, return the special value graphtik.NO_RESULT.