2. Pipelines

Graphtik’s operation.compose() factory handles the work of tying together operation instances into a runnable computation graph.

The simplest use case is to assemble a collection of individual operations into a runnable computation graph. The sample formula (1) from Quick start section illustrates this well:

>>> from operator import mul, sub
>>> from functools import partial
>>> from graphtik import compose, operation
>>> def abspow(a, p):
...    """Computes |a|^p. """
...    c = abs(a) ** p
...    return c

The call here to compose() yields a runnable computation graph that looks like this (where the circles are operations, squares are data, and octagons are parameters):

>>> # Compose the mul, sub, and abspow operations into a computation graph.
>>> formula = compose("maths",
...    operation(name="mul1", needs=["α", "β"], provides=["α×β"])(mul),
...    operation(name="sub1", needs=["α", "α×β"], provides=["α-α×β"])(sub),
...    operation(name="abspow1", needs=["α-α×β"], provides=["|α-α×β|³"])
...    (partial(abspow, p=3))
... )

This yields a graph which looks like this (see Plotting):

>>> formula.plot('calc_power.svg')  

sample formula (1)

Compiling and Running a pipeline

The graph composed above can be run by simply calling it with a dictionary of values with keys corresponding to the named dependencies (needs & provides):

>>> # Run the graph and request all of the outputs.
>>> out = formula(α=2, β=5)
>>> out
{'α': 2, 'β': 5, 'α×β': 10, 'α-α×β': -8, '|α-α×β|³': 512}

You may plot the solution:

>>> out.plot('a_solution.svg')  

The solution of the graph.

Alternatively, you may compile only (and validate()) the pipeline, to see which operations will be included in the graph (assuming the graph is solvable at all), based on the given inputs/outputs combination:

>>> plan = formula.compile(['α', 'β'], outputs='α-α×β')
>>> plan
ExecutionPlan(needs=['α', 'β'],
              provides=['α-α×β'],
              x5 steps: mul1, β, sub1, α, α×β)
>>> plan.validate()  # all fine

Plotting the plan reveals the pruned operations, and numbers operations and evictions (see next section) in the order of execution:

>>> plan.plot()  

Obtaining just the execution plan.

Tip

Hover over pruned (grey) operations to see why they were excluded from the plan.

But if an impossible combination of inputs & outputs is asked, the plan comes out empty:

>>> plan = formula.compile('α', outputs="α-α×β")
>>> plan
ExecutionPlan(needs=[], provides=[], x0 steps: )
>>> plan.validate()
Traceback (most recent call last):
ValueError: Unsolvable graph:
  +--Network(x8 nodes, x3 ops: mul1, sub1, abspow1)
  +--possible inputs: ['α', 'β', 'α×β', 'α-α×β']
  +--possible outputs: ['α×β', 'α-α×β', '|α-α×β|³']

Evictions: producing a subset of outputs

By default, calling a graph-operation on a set of inputs will yield all of that graph’s outputs. You can use the outputs parameter to request only a subset. For example, if formula is as above:

>>> # Run the graph-operation and request a subset of the outputs.
>>> out = formula.compute({'α': 2, 'β': 5}, outputs="α-α×β")
>>> out
{'α-α×β': -8}

When asking a subset of the graph’s outputs, Graphtik does 2 things:

  • it prunes any operations that are not on the path from given inputs to the requested outputs (e.g. the abspow1 operation, above, is not executed);

  • it evicts any intermediate data from solution as soon as they are not needed.

You may see (2) in action by including the sequence of execution steps into the plot:

>>> dot = out.plot(theme={"show_steps": True})

Tip

Read Plot customizations to understand the trick with the plot theme, above.

Short-circuiting a pipeline

You can short-circuit a graph computation, making certain inputs unnecessary, by providing a value in the graph that is further downstream in the graph than those inputs. For example, in the graph-operation we’ve been working with, you could provide the value of α-α×β to make the inputs α and β unnecessary:

>>> # Run the graph-operation and request a subset of the outputs.
>>> out = formula.compute({"α-α×β": -8})
>>> out
{'α-α×β': -8, '|α-α×β|³': 512}

When you do this, any operation nodes that are not on a path from the downstream input to the requested outputs (i.e. predecessors of the downstream input) are not computed. For example, the mul1 and sub1 operations are not executed here.

This can be useful if you have a graph-operation that accepts alternative forms of the same input. For example, if your graph-operation requires a PIL.Image as input, you could allow your graph to be run in an API server by adding an earlier operation that accepts as input a string of raw image data and converts that data into the needed PIL.Image. Then, you can either provide the raw image data string as input, or you can provide the PIL.Image if you have it and skip providing the image data string.

Re-computations

If you take the solution from a pipeline, change some values in it, and feed it back into the same pipeline as inputs, the recomputation will, unexpectedly, fail with Unsolvable graph error – all dependencies have already values, therefore any operations producing them are pruned out, till no operation remains:

>>> new_inp = formula.compute({"α": 2, "β": 5})
>>> new_inp["α"] = 20
>>> formula.compute(new_inp)
Traceback (most recent call last):
ValueError: Unsolvable graph:
+--Network(x8 nodes, x3 ops: mul1, sub1, abspow1)
+--possible inputs: ['α', 'β', 'α×β', 'α-α×β']
+--possible outputs: ['α×β', 'α-α×β', '|α-α×β|³']

One way to proceed is to avoid recompiling, by executing directly the pre-compiled plan, which will run all the original operations on the new values:

>>> sol = new_inp.plan.execute(new_inp)
>>> sol
{'α': 20, 'β': 5, 'α×β': 100, 'α-α×β': -80, '|α-α×β|³': 512000}
>>> [op.name for op in sol.executed]
['mul1', 'sub1', 'abspow1']

Hint

Notice that all values have been marked as overwrites.

But that trick wouldn’t work if the modified value is an inner dependency of the graph – in that case, the operations upstream would simply overwrite it:

>>> new_inp["α-α×β"] = 123
>>> sol = new_inp.plan.execute(new_inp)
>>> sol["α-α×β"]  # should have been 123!
-80

You can still do that using the recompute_from argument of Pipeline.compute(). It accepts a string/list of dependencies to recompute, downstream:

>>> sol = formula.compute(new_inp, recompute_from="α-α×β")
>>> sol
{'α': 20, 'β': 5, 'α×β': 10, 'α-α×β': 123, '|α-α×β|³': 1860867}
>>> [op.name for op in sol.executed]
['abspow1']

The old values are retained, although the operations producing them have been pruned from the plan.

Note

The value of α-α×β is no longer the correct result of sub1 operation, above it (hover to see sub1 inputs & output).

Extending pipelines

Sometimes we begin with existing computation graph(s) to which we want to extend with other operations and/or pipelines.

There are 2 ways to combine pipelines together, merging (the default) and nesting.

Merging

This is the default mode for compose() when when combining individual operations, and it works exactly the same when whole pipelines are involved.

For example, lets suppose that this simple pipeline describes the daily scheduled workload of an “empty’ day:

>>> weekday = compose("weekday",
...     operation(str, name="wake up", needs="backlog", provides="tasks"),
...     operation(str, name="sleep", needs="tasks", provides="todos"),
... )

Now let’s do some “work”:

>>> weekday = compose("weekday",
...     operation(lambda t: (t[:-1], t[-1:]),
...               name="work!", needs="tasks", provides=["tasks done", "todos"]),
...     operation(str, name="sleep"),
...     weekday,
... )

Notice that the pipeline to override was added last, at the bottom; that’s because the operations added earlier in the call (further to the left) override any identically-named operations added later.

Notice also that the overridden “sleep” operation hasn’t got any actual role in the schedule. We can eliminate “sleep” by using the excludes argument (it accepts also list):

>>> weekday = compose("weekday", weekday, excludes="sleep")

Nesting

Other times we want preserve all the operations composed, regardless of clashes in their names. This is doable with compose(..., nest=True)).

Lets build a schedule for the the 3-day week (covid19 γαρ…), by combining 3 mutated copies of the daily schedule we built earlier:

>>> weekdays = [weekday.withset(name=f"day {i}") for i in range(3)]
>>> week = compose("week", *weekdays, nest=True)

We now have 3 “isolated” clusters because all operations & data have been prefixed with the name of their pipeline they originally belonged to.

Let’s suppose we want to break the isolation, and have all sub-pipelines consume & produce from a common “backlog” (n.b. in real life, we would have a “feeder” & “collector” operations).

We do that by passing as the nest parameter a callable() which will decide which names of the original pipeline (operations & dependencies) should be prefixed (see also compose() & RenArgs for how to use that param):

>>> def rename_predicate(ren_args):
...     if ren_args.name not in ("backlog", "tasks done", "todos"):
...         return True
>>> week = compose("week", *weekdays, nest=rename_predicate)

Finally we may run the week’s schedule and get the outcome (whatever that might be :-), hover the results to see them):

>>> sol = week.compute({'backlog': "a lot!"})
>>> sol
{'backlog': 'a lot!',
 'day 0.tasks': 'a lot!',
 'tasks done': 'a lot', 'todos': '!',
 'day 1.tasks': 'a lot!',
 'day 2.tasks': 'a lot!'}
>>> dot = sol.plot(clusters=True)

Tip

We had to plot with clusters=True so that we prevent the plan to insert the “after pruning” cluster (see PlotArgs.clusters).

See also

Consult these test-cases from the full sources of the project:

  • test/test_combine.py

Advanced pipelines

Depending on sideffects

graphtik.modifier.sfx(name, optional: Optional[bool] = None)graphtik.modifier._Modifier[source]

sideffects denoting modifications beyond the scope of the solution.

Both needs & provides may be designated as sideffects using this modifier. They work as usual while solving the graph (planning) but they have a limited interaction with the operation’s underlying function; specifically:

  • input sideffects must exist in the solution as inputs for an operation depending on it to kick-in, when the computation starts - but this is not necessary for intermediate sideffects in the solution during execution;

  • input sideffects are NOT fed into underlying functions;

  • output sideffects are not expected from underlying functions, unless a rescheduled operation with partial outputs designates a sideffected as canceled by returning it with a falsy value (operation must returns dictionary).

Hint

If modifications involve some input/output, prefer the sfxed() modifier.

You may still convey this relationships by including the dependency name in the string - in the end, it’s just a string - but no enforcement of any kind will happen from graphtik, like:

>>> from graphtik import sfx
>>> sfx("price[sales_df]")
sfx('price[sales_df]')

Example:

A typical use-case is to signify changes in some “global” context, outside solution:

>>> from graphtik import operation, compose, sfx
>>> @operation(provides=sfx("lights off"))  # sideffect names can be anything
... def close_the_lights():
...    pass
>>> graph = compose('strip ease',
...     close_the_lights,
...     operation(
...         name='undress',
...         needs=[sfx("lights off")],
...         provides="body")(lambda: "TaDa!")
... )
>>> graph
Pipeline('strip ease',
                 needs=[sfx('lights off')],
                 provides=[sfx('lights off'), 'body'],
                 x2 ops: close_the_lights, undress)
>>> sol = graph()
>>> sol
{'body': 'TaDa!'}

sideffect

Note

Something has to provide a sideffect for a function needing it to execute - this could be another operation, like above, or the user-inputs; just specify some truthy value for the sideffect:

>>> sol = graph.compute({sfx("lights off"): True})

Modifying existing values in solutions

graphtik.modifier.sfxed(dependency: str, sfx0: str, *sfx_list: str, keyword: Optional[str] = None, optional: Optional[bool] = None, accessor: Optional[graphtik.modifier.Accessor] = None, jsonp=None)graphtik.modifier._Modifier[source]

Annotates a sideffected dependency in the solution sustaining side-effects.

Parameters
  • dependency – the actual dependency receiving the sideffect, which will be fed into/out of the function.

  • sfx0 – the 1st (arbitrary object) sideffect marked as “acting” on the dependency.

  • sfx0 – more (arbitrary object) sideffects (like the sfx0)

  • keyword – the name for the function argument it corresponds. When optional, it becomes the same as name if falsy, so as to behave always like kw-type arg, and to preserve fn-name if ever renamed. When not optional, if not given, it’s all fine.

  • accessor – the functions to access values to/from solution (see Accessor) (actually a 2-tuple with functions is ok)

  • jsonp – None (derrived from name), False, str, collection of str/callable (last one) See generic modify() modifier.

Like sfx() but annotating a real dependency in the solution, allowing that dependency to be present both in needs and provides of the same function.

Example:

A typical use-case is to signify columns required to produce new ones in pandas dataframes (emulated with dictionaries):

>>> from graphtik import operation, compose, sfxed
>>> @operation(needs="order_items",
...            provides=sfxed("ORDER", "Items", "Prices"))
... def new_order(items: list) -> "pd.DataFrame":
...     order = {"items": items}
...     # Pretend we get the prices from sales.
...     order['prices'] = list(range(1, len(order['items']) + 1))
...     return order
>>> @operation(
...     needs=[sfxed("ORDER", "Items"), "vat rate"],
...     provides=sfxed("ORDER", "VAT")
... )
... def fill_in_vat(order: "pd.DataFrame", vat: float):
...     order['VAT'] = [i * vat for i in order['prices']]
...     return order
>>> @operation(
...     needs=[sfxed("ORDER", "Prices", "VAT")],
...     provides=sfxed("ORDER", "Totals")
... )
... def finalize_prices(order: "pd.DataFrame"):
...     order['totals'] = [p + v for p, v in zip(order['prices'], order['VAT'])]
...     return order

To view all internal dependencies, enable DEBUG in configurations:

>>> from graphtik.config import debug_enabled
>>> with debug_enabled(True):
...     finalize_prices
FnOp(name='finalize_prices',
     needs=[sfxed('ORDER', 'Prices'), sfxed('ORDER', 'VAT')],
     _user_needs=[sfxed('ORDER', 'Prices', 'VAT')],
     _fn_needs=['ORDER'], provides=[sfxed('ORDER', 'Totals')],
     _user_provides=[sfxed('ORDER', 'Totals')],
     _fn_provides=['ORDER'],
     fn='finalize_prices')

Notice that declaring a single sideffected with many items in sfx_list, expands into multiple “singular” sideffected dependencies in the network (check needs vs _user_needs above).

>>> proc_order = compose('process order', new_order, fill_in_vat, finalize_prices)
>>> sol = proc_order.compute({
...      "order_items": ["toilet-paper", "soap"],
...      "vat rate": 0.18,
... })
>>> sol
{'order_items': ['toilet-paper', 'soap'],
 'vat rate': 0.18,
 'ORDER': {'items': ['toilet-paper', 'soap'],
           'prices': [1, 2],
           'VAT': [0.18, 0.36],
           'totals': [1.18, 2.36]}}

sideffecteds

Notice that although many functions consume & produce the same ORDER dependency, something that would have formed cycles, the wrapping operations need and provide different sideffected instances, breaking thus the cycles.

See also

The elaborate example in Hierarchical data and further tricks section.

Resilience when operations fail (endurance)

It is possible for a pipeline to persist executing operations, even if some of them are raising errors, if they are marked as endured:

>>> @operation(endured=1, provides=["space", "time"])
... def get_out():
...     raise ValueError("Quarantined!")
>>> get_out
FnOp!(name='get_out', provides=['space', 'time'], fn='get_out')

Notice the exclamation(!) before the parenthesis in the string representation of the operation.

>>> @operation(needs="space", provides="fun")
... def exercise(where):
...     return "refreshed"
>>> @operation(endured=1, provides="time")
... def stay_home():
...     return "1h"
>>> @operation(needs="time", provides="fun")
... def read_book(for_how_long):
...     return "relaxed"
>>> covid19 = compose("covid19", get_out, stay_home, exercise, read_book)
>>> covid19
Pipeline('covid19',
                 needs=['space', 'time'],
                 provides=['space', 'time', 'fun'],
                 x4 ops: get_out, stay_home, exercise, read_book)

Notice the thick outlines of the endured (or rescheduled, see below) operations.

When executed, the pipeline produced outputs, although one of its operations has failed:

>>> sol = covid19()
>>> sol
{'time': '1h', 'fun': 'relaxed'}

You may still abort on failures, later, by raising an appropriate exception from Solution:

>>> sol.scream_if_incomplete()
Traceback (most recent call last):
graphtik.base.IncompleteExecutionError:
Not completed x2 operations ['exercise', 'get_out'] due to x1 failures and x0 partial-ops:
  +--get_out: ValueError('Quarantined!')

Operations with partial outputs (rescheduled)

In case the actually produce outputs depend on some condition in the inputs, the solution has to reschedule the plan amidst execution, and consider the actual provides delivered:

>>> @operation(rescheduled=1,
...            needs="quarantine",
...            provides=["space", "time"],
...            returns_dict=True)
... def get_out_or_stay_home(quarantine):
...     if quarantine:
...          return {"time": "1h"}
...     else:
...          return {"space": "around the block"}
>>> get_out_or_stay_home
FnOp?(name='get_out_or_stay_home',
                     needs=['quarantine'],
                     provides=['space', 'time'],
                     fn{}='get_out_or_stay_home')
>>> @operation(needs="space", provides=["fun", "body"])
... def exercise(where):
...     return "refreshed", "strong feet"
>>> @operation(needs="time", provides=["fun", "brain"])
... def read_book(for_how_long):
...     return "relaxed", "popular physics"
>>> covid19 = compose("covid19", get_out_or_stay_home, exercise, read_book)

Depending on “quarantine’ state we get to execute different part of the pipeline:

>>> sol = covid19(quarantine=True)
>>> sol = covid19(quarantine=False)

In both case, a warning gets raised about the missing outputs, but the execution proceeds regularly to what it is possible to evaluate. You may collect a report of what has been canceled using this:

>>> print(sol.check_if_incomplete())
Not completed x1 operations ['read_book'] due to x0 failures and x1 partial-ops:
  +--get_out_or_stay_home: ['time']

In case you wish to cancel the output of a single-result operation, return the special value graphtik.NO_RESULT.

Hierarchical data and further tricks

Working with hierarchical data relies upon dependencies expressed as json pointer paths against solution data-tree. Let’s retrofit the “weekly tasks” example from Nesting section, above.

In the previous example, we had left out the collection of the tasks and the TODOs – this time we’re going to:

  1. properly distribute & backlog the tasks to be done across the days using sideffected dependencies to modify the original stack of tasks in-place, while the workflow is running,

  2. exemplify further the use of operation nesting & renaming, and

  3. (unstable API) access the wrapping operation and execution machinery from within the function by using task_context, and finally

  4. store the input backlog, the work done, and the TODOs from the tasks in this data-tree:

    +--backlog
    +--Monday.tasks
    +--Wednesday.tasks
    +--Tuesday.tasks
    +--daily_tasks/
       +--Monday
       +--Tuesday
       +--Wednesday
    +--weekly_tasks
    +--todos
    

First, let’s build the single day’s workflow, without any nesting:

>>> from graphtik import NO_RESULT, sfxed
>>> from graphtik.base import RenArgs  # type hints for autocompletion.
>>> from graphtik.execution import task_context
>>> from graphtik.modifier import dep_renamed
>>> todos = sfxed("backlog", "todos")
>>> @operation(name="wake up",
...            needs="backlog",
...            provides=["tasks", todos],
...            rescheduled=True
... )
... def pick_tasks(backlog):
...     if not backlog:
...         return NO_RESULT
...     # Pick from backlog 1/3 of len-of-chars of my day-name.
...     n_tasks = int(len(task_context.get().op.name) / 3)
...     my_tasks, todos = backlog[:n_tasks], backlog[n_tasks:]
...     return my_tasks, todos

The actual work is emulated with a conveyor operation:

>>> do_tasks = operation(fn=None, name="work!", needs="tasks", provides="daily_tasks")
>>> weekday = compose("weekday", pick_tasks, do_tasks)

Notice that the “backlog” sideffected result of the “wakeup” operation is also listed in its needs; through this trick, each daily tasks can remove the tasks it completed from the initial backlog of tasks, for the next day to pick up. The “todos” sfx is just a name to denote the kind of modification performed on the “backlog”

Note also that the tasks passed from “wake up” –> “work!” operations are not hierarchical, but kept “private” in each day by nesting them with a dot(.):

Now let’s clone the daily-task x3 and nest it, to make a 3-day workflow:

>>> days = ["Monday", "Tuesday", "Wednesday"]
>>> weekdays = [weekday.withset(name=d) for d in days]
>>> def nester(ra: RenArgs):
...     dep = ra.name
...     if ra.typ == "op":
...         return True  # Nest by.dot.
...     if ra.typ.endswith(".jsonpart"):
...         return False  # Don't touch the json-pointer parts.
...     if dep == "tasks":
...         return True  # Nest by.dot
...     if dep == "daily_tasks":
...         # Nest as subdoc.
...         return dep_renamed(dep, lambda n: f"{n}/{ra.parent.name}")
...     return False
>>> week = compose("week", *weekdays, nest=nester)

And this is now the pipeline for a 3 day-week. Notice the tasks-done/{day} subdoc nodes at the bottom of the diagram:

Finally combine all weekly-work using a “collector” operation:

>>> from graphtik import vararg
>>> @operation(
...     name="collect tasks",
...     needs=[todos, *(vararg(f"daily_tasks/{d}") for d in days)],
...     provides=["weekly_tasks", "todos"],
... )
... def collector(backlog, *daily_tasks):
...     return daily_tasks or (), backlog or ()
...
>>> week = compose("week", week, collector)

This is the final week pipeline:

We can now feed the week pipeline with a “workload” of 17 imaginary tasks. We know from each “wake up” operation that Monday, Tuesday & Wednesday will pick 4, 5 & 5 tasks respectively, leaving 3 tasks as “todo”:

>>> sol = week.compute({"backlog": range(17)})
>>> sol
{'backlog': range(14, 17),
 'Monday.tasks': range(0, 4),
 'daily_tasks': {'Monday': range(0, 4),
                'Tuesday': range(4, 9),
                'Wednesday': range(9, 14)},
 'Tuesday.tasks': range(4, 9),
 'Wednesday.tasks': range(9, 14),
 'weekly_tasks': (range(0, 4), range(4, 9), range(9, 14)),
 'todos': range(14, 17)}

Or we can reduce the workload, and see that Wednesday is left without any work to do:

>>> sol = week.compute(
...     {"backlog": range(9)},
...     outputs=["daily_tasks", "weekly_tasks", "todos"])

Hover over the data nodes to see the results. Specifically check the “daily_tasks” which is a nested dictionary:

>>> sol
{'daily_tasks': {'Monday': range(0, 4),
                'Tuesday': range(4, 9)},
 'weekly_tasks': (range(0, 4), range(4, 9)),
 'todos': ()}

Tip

If an operation works with dependencies only in some sub-document and below, its prefix can be factored-out as a current-working-document, an argument given when defining the operation.

Concatenating Pandas

Writing output values into jsonp paths wotks fine for dictionaries, but it is not always possible to modify pandas objects that way (e.g. multi-indexed objects). In that case you may concatenate the output pandas with those in solution, by annotating provides with hcat() or vcat modifiers (which eventually select different accessors).

For instance, assuming an input document that contains 2 dataframes with the same number of rows:

/data_lake/satellite_data:   pd.DataFrame(...)
/db/planet_ephemeris:        pd.DataFrame(...)

… we can copy multiple columns from satellite_data –> planet_ephemeris, at once, with something like this:

@operation(
    needs="data_lake/satellite_data",
    provides=hcat("db/planet_ephemeris/orbitals")
)
def extract_planets_columns(satellite_df):
    orbitals_df = satellite_df[3:8]  # the orbital columns
    orbitals_df.columns = pd.MultiIndex.from_product(
        [["orbitals", orbitals_df.columns]]
    )

    return orbitals_df

Hint

Notice that we used the same orbitals name, both for the sub-name in the jsonp expression, and as a new level in the multi-index columns of the orbitals_df dataframe.

That will help further down the road, to index and extract that group of columns with /db/planet_ephemeris/orbitals dependency, and continue building the network.