2. Pipelines

Graphtik’s operation.compose() factory handles the work of tying together operation instances into a runnable computation graph.

The simplest use case is to assemble a collection of individual operations into a runnable computation graph. The example script from Quick start illustrates this well:

>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> def abspow(a, p):
...    """Computes |a|^p. """
...    c = abs(a) ** p
...    return c

The call here to compose() yields a runnable computation graph that looks like this (where the circles are operations, squares are data, and octagons are parameters):

>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> graphop = compose("graphop",
...    operation(name="mul1", needs=["a", "b"], provides=["ab"])(mul),
...    operation(name="sub1", needs=["a", "ab"], provides=["a_minus_ab"])(sub),
...    operation(name="abspow1", needs=["a_minus_ab"], provides=["abs_a_minus_ab_cubed"])
...    (partial(abspow, p=3))
... )

This yields a graph which looks like this (see Plotting):

>>> graphop.plot('calc_power.svg')  

graphop

Running a pipeline

The graph composed above can be run by simply calling it with a dictionary of values with keys corresponding to the named dependencies (needs & provides):

>>> # Run the graph and request all of the outputs.
>>> out = graphop(a=2, b=5)
>>> out
{'a': 2, 'b': 5, 'ab': 10, 'a_minus_ab': -8, 'abs_a_minus_ab_cubed': 512}

You may plot the solution:

>>> out.plot('a_solution.svg')  

the solution of the graph

Producing a subset of outputs

By default, calling a graph-operation on a set of inputs will yield all of that graph’s outputs. You can use the outputs parameter to request only a subset. For example, if graphop is as above:

>>> # Run the graph-operation and request a subset of the outputs.
>>> out = graphop.compute({'a': 2, 'b': 5}, outputs="a_minus_ab")
>>> out
{'a_minus_ab': -8}

When asking a subset of the graph’s outputs, Graphtik does 2 things:

  • it prunes any operations that are not on the path from given inputs to the requested outputs (e.g. the abspow1 operation, above, is not executed);

  • it evicts any intermediate data from solution as soon as they are not needed.

You may see (2) in action by including the sequence of execution steps into the plot:

>>> dot = out.plot(theme={"show_steps": True})

Short-circuiting a pipeline

You can short-circuit a graph computation, making certain inputs unnecessary, by providing a value in the graph that is further downstream in the graph than those inputs. For example, in the graph-operation we’ve been working with, you could provide the value of a_minus_ab to make the inputs a and b unnecessary:

>>> # Run the graph-operation and request a subset of the outputs.
>>> out = graphop(a_minus_ab=-8)
>>> out
{'a_minus_ab': -8, 'abs_a_minus_ab_cubed': 512}

When you do this, any operation nodes that are not on a path from the downstream input to the requested outputs (i.e. predecessors of the downstream input) are not computed. For example, the mul1 and sub1 operations are not executed here.

This can be useful if you have a graph-operation that accepts alternative forms of the same input. For example, if your graph-operation requires a PIL.Image as input, you could allow your graph to be run in an API server by adding an earlier operation that accepts as input a string of raw image data and converts that data into the needed PIL.Image. Then, you can either provide the raw image data string as input, or you can provide the PIL.Image if you have it and skip providing the image data string.

Extending pipelines

Sometimes we begin with existing computation graph(s) to which we want to extend with other operations and/or pipelines.

There are 2 ways to combine pipelines together, merging (the default) and nesting.

Merging

This is the default mode for compose() when when combining individual operations, and it works exactly the same when whole pipelines are involved.

For example, lets suppose that this simple pipeline describes the daily scheduled workload of an “empty’ day:

>>> weekday = compose("weekday",
...     operation(str, name="wake up", needs="backlog", provides="tasks"),
...     operation(str, name="sleep", needs="tasks", provides="todos"),
... )

Now let’s do some “work”:

>>> weekday = compose("weekday",
...     operation(lambda t: (t[:-1], t[-1:]),
...               name="work!", needs="tasks", provides=["tasks done", "todos"]),
...     operation(str, name="sleep"),
...     weekday,
... )

Notice that the pipeline to override was added last, at the bottom; that’s because the operations added earlier in the call (further to the left) override any identically-named operations added later.

Notice also that the overridden “sleep” operation hasn’t got any actual role in the schedule. We can eliminate “sleep” altogether by pre-registering the special NULL_OP operation under the same name, “sleep”:

>>> from graphtik import NULL_OP
>>> weekday = compose("weekday", NULL_OP("sleep"), weekday)

Nesting

Other times we want preserve all the operations composed, regardless of clashes in their names. This is doable with compose(..., nest=True)).

Lets build a schedule for the the 3-day week (covid19 γαρ…), by combining 3 mutated copies of the daily schedule we built earlier:

>>> weekdays = [weekday.withset(name=f"day {i}") for i in range(3)]
>>> week = compose("week", *weekdays, nest=True)

We now have 3 “isolated” clusters because all operations & data have been prefixed with the name of their pipeline they originally belonged to.

Let’s suppose we want to break the isolation, and have all sub-pipelines consume & produce from a common “backlog” (n.b. in real life, we would have a “feeder” & “collector” operations).

We do that by passing as the nest parameter a callable() which will decide which names of the original pipeline (operations & dependencies) should be prefixed (see also compose() & RenArgs for how to use that param):

>>> def rename_predicate(ren_args):
...     if ren_args.name not in ("backlog", "tasks done", "todos"):
...         return True
>>> week = compose("week", *weekdays, nest=rename_predicate)

Finally we may run the week’s schedule and get the outcome (whatever that might be :-), hover the results to see them):

>>> sol = week.compute({'backlog': "a lot!"})
>>> sol
{'backlog': 'a lot!',
 'day 0.tasks': 'a lot!',
 'tasks done': 'a lot', 'todos': '!',
 'day 1.tasks': 'a lot!',
 'day 2.tasks': 'a lot!'}
>>> dot = sol.plot(clusters=True)

Tip

We had to plot with clusters=True so that we prevent the plan to insert the “after pruning” cluster (see PlotArgs.clusters).

See also

Consult these test-cases from the full sources of the project:

  • test/test_combine.py

Advanced pipelines

Depending on sideffects

graphtik.modifier.sfx(name, optional: bool = None)graphtik.modifier._Modifier[source]

sideffects denoting modifications beyond the scope of the solution.

Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (planning) but they have a limited interaction with the operation’s underlying function; specifically:

  • input sideffects must exist in the solution as inputs for an operation depending on it to kick-in, when the computation starts - but this is not necessary for intermediate sideffects in the solution during execution;

  • input sideffects are NOT fed into underlying functions;

  • output sideffects are not expected from underlying functions, unless a rescheduled operation with partial outputs designates a sideffected as canceled by returning it with a falsy value (operation must returns dictionary).

Hint

If modifications involve some input/output, prefer the sfxed() modifier.

You may still convey this relationships by including the dependency name in the string - in the end, it’s just a string - but no enforcement of any kind will happen from graphtik, like:

>>> from graphtik import sfx
>>> sfx("price[sales_df]")
sfx('price[sales_df]')

Example:

A typical use-case is to signify changes in some “global” context, outside solution:

>>> from graphtik import operation, compose, sfx
>>> @operation(provides=sfx("lights off"))  # sideffect names can be anything
... def close_the_lights():
...    pass
>>> graph = compose('strip ease',
...     close_the_lights,
...     operation(
...         name='undress',
...         needs=[sfx("lights off")],
...         provides="body")(lambda: "TaDa!")
... )
>>> graph
Pipeline('strip ease',
                 needs=[sfx('lights off')],
                 provides=[sfx('lights off'), 'body'],
                 x2 ops: close_the_lights, undress)
>>> sol = graph()
>>> sol
{'body': 'TaDa!'}

sideffect

Note

Something has to provide a sideffect for a function needing it to execute - this could be another operation, like above, or the user-inputs; just specify some truthy value for the sideffect:

>>> sol = graph.compute({sfx("lights off"): True})

Modifying existing values in solutions

graphtik.modifier.sfxed(dependency: str, sfx0: str, *sfx_list: str, keyword: str = None, optional: bool = None, accessor: graphtik.modifier.Accessor = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotates a sideffected dependency in the solution sustaining side-effects.

Parameters
  • keyword – the name for the function argument it corresponds. When optional, it becomes the same as name if falsy, so as to behave always like kw-type arg, and to preserve fn-name if ever renamed. When not optional, if not given, it’s all fine.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp

    If given, it may be the pre-splitted parts of the json pointer path for the dependency – in that case, the dependency name is irrelevant – or a falsy (but not None) value, to disable the automatic interpeting of the dependency name as a json pointer path, regardless of any containing slashes.

    Tip

    If accessing pandas, you may pass an already splitted path with its last part being a callable indexer.

Like sfx() but annotating a real dependency in the solution, allowing that dependency to be present both in needs and provides of the same function.

Example:

A typical use-case is to signify columns required to produce new ones in pandas dataframes (emulated with dictionaries):

>>> from graphtik import operation, compose, sfxed
>>> @operation(needs="order_items",
...            provides=sfxed("ORDER", "Items", "Prices"))
... def new_order(items: list) -> "pd.DataFrame":
...     order = {"items": items}
...     # Pretend we get the prices from sales.
...     order['prices'] = list(range(1, len(order['items']) + 1))
...     return order
>>> @operation(
...     needs=[sfxed("ORDER", "Items"), "vat rate"],
...     provides=sfxed("ORDER", "VAT")
... )
... def fill_in_vat(order: "pd.DataFrame", vat: float):
...     order['VAT'] = [i * vat for i in order['prices']]
...     return order
>>> @operation(
...     needs=[sfxed("ORDER", "Prices", "VAT")],
...     provides=sfxed("ORDER", "Totals")
... )
... def finalize_prices(order: "pd.DataFrame"):
...     order['totals'] = [p + v for p, v in zip(order['prices'], order['VAT'])]
...     return order

To view all internal dependencies, enable DEBUG in configurations:

>>> from graphtik.config import debug_enabled
>>> with debug_enabled(True):
...     finalize_prices
FnOp(name='finalize_prices',
                    needs=[sfxed('ORDER', 'Prices'), sfxed('ORDER', 'VAT')],
                    op_needs=[sfxed('ORDER', 'Prices'), sfxed('ORDER', 'VAT')],
                    _fn_needs=['ORDER'],
                    provides=[sfxed('ORDER', 'Totals')],
                    op_provides=[sfxed('ORDER', 'Totals')],
                    _fn_provides=['ORDER'],
                    fn='finalize_prices')

Notice that declaring a single sideffected with many items in sfx_list, expands into multiple “singular” sideffected dependencies in the network (check needs & op_needs above).

>>> proc_order = compose('process order', new_order, fill_in_vat, finalize_prices)
>>> sol = proc_order.compute({
...      "order_items": ["toilet-paper", "soap"],
...      "vat rate": 0.18,
... })
>>> sol
{'order_items': ['toilet-paper', 'soap'],
 'vat rate': 0.18,
 'ORDER': {'items': ['toilet-paper', 'soap'],
           'prices': [1, 2],
           'VAT': [0.18, 0.36],
           'totals': [1.18, 2.36]}}

sideffecteds

Notice that although many functions consume & produce the same ORDER dependency (check fn_needs & fn_provides, above), something that would have formed cycles, the wrapping operations need and provide different sideffected instances, breaking the cycles.

See also

The elaborate example in Hierarchical data and further tricks section.

Resilience when operations fail (endurance)

It is possible for a pipeline to persist executing operations, even if some of them are raising errors, if they are marked as endured:

>>> @operation(endured=1, provides=["space", "time"])
... def get_out():
...     raise ValueError("Quarantined!")
>>> get_out
FnOp!(name='get_out', provides=['space', 'time'], fn='get_out')

Notice the exclamation(!) before the parenthesis in the string representation of the operation.

>>> @operation(needs="space", provides="fun")
... def exercise(where):
...     return "refreshed"
>>> @operation(endured=1, provides="time")
... def stay_home():
...     return "1h"
>>> @operation(needs="time", provides="fun")
... def read_book(for_how_long):
...     return "relaxed"
>>> pipeline = compose("covid19", get_out, stay_home, exercise, read_book)
>>> pipeline
Pipeline('covid19',
                 needs=['space', 'time'],
                 provides=['space', 'time', 'fun'],
                 x4 ops: get_out, stay_home, exercise, read_book)

Notice the thick outlines of the endured (or rescheduled, see below) operations.

When executed, the pipeline produced outputs, although one of its operations has failed:

>>> sol = pipeline()
>>> sol
{'time': '1h', 'fun': 'relaxed'}

You may still abort on failures, later, by raising an appropriate exception from Solution:

>>> sol.scream_if_incomplete()
Traceback (most recent call last):
...
graphtik.base.IncompleteExecutionError:
Not completed x2 operations ['exercise', 'get_out'] due to x1 failures and x0 partial-ops:
  +--get_out: ValueError('Quarantined!')

Operations with partial outputs (rescheduled)

In case the actually produce outputs depend on some condition in the inputs, the solution has to reschedule the plan amidst execution, and consider the actual provides delivered:

>>> @operation(rescheduled=1,
...            needs="quarantine",
...            provides=["space", "time"],
...            returns_dict=True)
... def get_out_or_stay_home(quarantine):
...     if quarantine:
...          return {"time": "1h"}
...     else:
...          return {"space": "around the block"}
>>> get_out_or_stay_home
FnOp?(name='get_out_or_stay_home',
                     needs=['quarantine'],
                     provides=['space', 'time'],
                     fn{}='get_out_or_stay_home')
>>> @operation(needs="space", provides=["fun", "body"])
... def exercise(where):
...     return "refreshed", "strong feet"
>>> @operation(needs="time", provides=["fun", "brain"])
... def read_book(for_how_long):
...     return "relaxed", "popular physics"
>>> pipeline = compose("covid19", get_out_or_stay_home, exercise, read_book)

Depending on “quarantine’ state we get to execute different part of the pipeline:

>>> sol = pipeline(quarantine=True)
>>> sol = pipeline(quarantine=False)

In both case, a warning gets raised about the missing outputs, but the execution proceeds regularly to what it is possible to evaluate. You may collect a report of what has been canceled using this:

>>> print(sol.check_if_incomplete())
Not completed x1 operations ['read_book'] due to x0 failures and x1 partial-ops:
  +--get_out_or_stay_home: ['time']

In case you wish to cancel the output of a single-result operation, return the special value graphtik.NO_RESULT.

Hierarchical data and further tricks

Working with hierarchical data relies upon dependencies expressed as json pointer paths against solution data-tree Let’s retrofit the “weekly tasks” example from Nesting section, above.

In the previous example, we had left out the collection of the tasks and the TODOs – this time we’re going to:

  1. properly distribute & backlog the tasks to be done across the days using sideffected dependencies to modify the original stack of tasks in-place, while the workflow is running,

  2. exemplify further the use of operation nesting & renaming, and

  3. access the wrapping operation and execution machinery from within the function by using task_context, and finally

  4. store the input backlog, the work done, and the TODOs from the tasks in this data-tree:

    +--backlog
    +--Monday.tasks
    +--Wednesday.tasks
    +--Tuesday.tasks
    +--daily_tasks/
       +--Monday
       +--Tuesday
       +--Wednesday
    +--weekly_tasks
    +--todos
    

First, let’s build the single day’s workflow, without any nesting:

>>> from graphtik import NO_RESULT, sfxed
>>> from graphtik.base import RenArgs  # type hints for autocompletion.
>>> from graphtik.execution import task_context
>>> from graphtik.modifier import dep_renamed
>>> todos = sfxed("backlog", "todos")
>>> @operation(name="wake up",
...            needs="backlog",
...            provides=["tasks", todos],
...            rescheduled=True
... )
... def pick_tasks(backlog):
...     if not backlog:
...         return NO_RESULT
...     # Pick from backlog 1/3 of len-of-chars of my day-name.
...     n_tasks = int(len(task_context.get().op.name) / 3)
...     my_tasks, todos = backlog[:n_tasks], backlog[n_tasks:]
...     return my_tasks, todos

The actual work is emulated with a conveyor operation:

>>> do_tasks = operation(fn=None, name="work!", needs="tasks", provides="daily_tasks")
>>> weekday = compose("weekday", pick_tasks, do_tasks)

Notice that the “backlog” sideffected result of the “wakeup” operation is also listed in its needs; through this trick, each daily tasks can remove the tasks it completed from the initial backlog of tasks, for the next day to pick up. The “todos” sfx is just a name to denote the kind of modification performed on the “backlog”

Note also that the tasks passed from “wake up” –> “work!” operations are not hierarchical, but kept “private” in each day by nesting them with a dot(.):

Now let’s clone the daily-task x3 and nest it, to make a 3-day workflow:

>>> days = ["Monday", "Tuesday", "Wednesday"]
>>> weekdays = [weekday.withset(name=d) for d in days]
>>> def nester(ra: RenArgs):
...     dep = ra.name
...     if ra.typ == "op":
...         return True  # Nest by.dot.
...     if ra.typ.endswith(".jsonpart"):
...         return False  # Don't touch the json-pointer parts.
...     if dep == "tasks":
...         return True  # Nest by.dot
...     if dep == "daily_tasks":
...         # Nest as subdoc.
...         return dep_renamed(dep, lambda n: f"{n}/{ra.parent.name}")
...     return False
>>> week = compose("week", *weekdays, nest=nester)

And this is now the pipeline for a 3 day-week. Notice the tasks-done/{day} subdoc nodes at the bottom of the diagram:

Finally combine all weekly-work using a “collector” operation:

>>> from graphtik import vararg
>>> @operation(
...     name="collect tasks",
...     needs=[todos, *(vararg(f"daily_tasks/{d}") for d in days)],
...     provides=["weekly_tasks", "todos"],
... )
... def collector(backlog, *daily_tasks):
...     return daily_tasks or (), backlog or ()
...
>>> week = compose("week", week, collector)

This is the final week pipeline:

We can now feed the week pipeline with a “workload” of 17 imaginary tasks. We know from each “wake up” operation that Monday, Tuesday & Wednesday will pick 4, 5 & 5 tasks respectively, leaving 3 tasks as “todo”:

>>> sol = week.compute({"backlog": range(17)})
>>> sol
{'backlog': range(14, 17),
 'Monday.tasks': range(0, 4),
 'daily_tasks': {'Monday': range(0, 4),
                'Tuesday': range(4, 9),
                'Wednesday': range(9, 14)},
 'Tuesday.tasks': range(4, 9),
 'Wednesday.tasks': range(9, 14),
 'weekly_tasks': (range(0, 4), range(4, 9), range(9, 14)),
 'todos': range(14, 17)}

Or we can reduce the workload, and see that Wednesday is left without any work to do:

>>> sol = week.compute(
...     named_inputs={"backlog": range(9)},
...     outputs=["daily_tasks", "weekly_tasks", "todos"])

Hover over the data nodes to see the results. Specifically check the “daily_tasks” which is a nested dictionary:

>>> sol
{'daily_tasks': {'Monday': range(0, 4),
                'Tuesday': range(4, 9)},
 'weekly_tasks': (range(0, 4), range(4, 9)),
 'todos': ()}