jip.pipelines

The JIP Pipeline module contains the classs and functions used to create pipeline graphs

class jip.pipelines.Edge(source, target)

An edge in the pipeline graph connecting source and target nodes. The edge has optional information about the jip.options.Options that are connected through this edge.

The edge carries a set on links. Links are tuples of the form (source_option, target_option, streamable).

In addition, the edges _group flag indicates that the two nodes linked by the edge should form a job group.

Create an option link between the source option and the target options. This also checks that the source_option source is the same as the edges source._tool and the target_option source is the same as the edges target._tool

Parameters:

Returns the first link that is set to streaming

Returns true if a least one link is set to streaming

Iterate the links associated with this edge and make sure that their values are unset in the target options.

class jip.pipelines.Job(pipeline=None, **kwargs)

Container class that wraps job meta-data.

The pipeline job extends the general jip.profiles.Profile, and extends it in a way that you can create new pipeline nodes from the job. Those nodes will then hold a reference to the profile and all customization on the profile will be applied to the node.

bash(command, **kwargs)

Create a new bash job.

Parameters:
  • command – the bash command
  • kwargs – keyword arguments passed on the bash job
Returns:

the newly created node

Return type:

Node

name

Set the jobs name

Getter:access the jobs name
Setter:set the jobs name
Type:string
run(*args, **kwargs)

Delegates to Pipeline.run() and runs the specified tool using this job environment configuration

Parameters:
  • args – args passed on to the pipeline run method
  • kwargs – kwargs passed on to the pipeline run method
Returns:

the newly created node

Return type:

Node

class jip.pipelines.Node(tool, graph, index=-1)

A single node in the pipeline graph.

If the node is linked to a jip.tools.Tool instance, attributes are resolved using teh tools options and the jip.options.Option instances are returned. This mechanism is used to automatically create edges between tools when their options are referenced. These links are stored on the Edge. If no edge exists, one will be created.

children()

Yields a list of all children of this node

Returns:generator for all child nodes
Return type:generator for Node
depends_on(*args)

Add an explicit dependency between this node and the other node. The function accepts multiple values so you can specify multiple parents at once.

Parameters:args* – all parent nodes.

Find a link in the incoming edges where the target option is the given option

Parameters:option (jip.options.Option) – the option to search for
Returns:link instance for the given option or None

Find a link in the outgoing edges where the source option is the given option

Parameters:option (jip.options.Option) – the option to search for
Returns:link instance for the given option or None
get_stream_input()

Returns a tuple of an options and a node, where the options supports streams and the node is a parent node of this node. If no such combination exists, a tuple of (None, None) will be returned.

Returns:tuple of (Option, Node) where the option supports streaming and the Node is a parent node.
group(other)

Groups this not and the other node. This creates a dependency between this node and the other nodes and enables grouping so the two nodes will be executed in the same job. The other node is returned so group chains can be created easily.

Parameters:other (Node) – the child node
Returns other:the other node
has_incoming(other=None, link=None, stream=None, value=None)

Returns true if this node has an incoming edge where the parent node is the given other node. If link is specified, it has to but a tuple with the source and the target option names. If specified the detected edge has to carry the specified link. If stream is not None the link is checked if its a streaming link or not.

If not other node is specified this returns True if this node has any incoming edges.

If value is specified, the delegate value has to be equal to the specified value.

You can use the incoming edge check like this:

node.has_incoming(other, ('output', 'input'), False, "data.txt")

This return True if the node node has an incoming edge from the other node, the edge linkes other.output to node.input, no stream is passed and the actual value is “data.txt”.

Parameters:
  • other (Node) – the potential parent node
  • link – optional tuple with source and target option names
  • stream – boolean that ensures that the link is streaming or not, depending on the specified value
  • value – specify an optional value that is compared against the delegated value
Returns:

True if the edge exists

has_outgoing(other=None, link=None, stream=None, value=None)

Returns true if this node has an outgoing edge where the child node is the given other node. If link is specified, it has to but a tuple with the source and the target option names. If specified the detected edge has to carry the specified link. If stream is not None the link is checked if its a streaming link or not.

If not other node is specified this returns True if this node has any outgoing edges.

If value is specified, the delegate value has to be equal to the specified value

You can use the outgoing edge check like this:

node.has_outgoing(other, ('output', 'input'), False, "data.txt")

This return True if the node node has an outgoing edge to the other node, the edge links node.output to other.input, no stream is passed and the actual value is “data.txt”.

Parameters:
  • other (Node) – the potential child node
  • link – optional tuple with source and target option names
  • stream – boolean that ensures that the link is streaming or not, depending on the specified value
  • value – specify an optional value that is compared against the delegated value
Returns:

True if the edge exists

incoming()

Yields all incoming edges of this node

Returns:generator for all incoming edges
Return type:generator for Edge
job

The nodes job profile

Getter:Returns the nodes job profile
Type:jip.pipelines.Job
name

Get a unique name for this node.

The unique name is created based on the job name. If no job name is assigned, the tool name is used. If the new node name is not unique within the pipeline context, the nodes index is appended to the node.

Getter:returns a unique name for this node
Type:string
on_success(tool=None, **kwargs)

Create an embedded pipeline that will be submitted or executed after this node was successfully executed. The function returns a tuple: (pipeline, node)

Parameters:
  • tool – the tool to run
  • kwargs – option arguments for the tool
Returns:

tuple of (pipeline, node)

outgoing()

Yields all outgoing edges of this node

Returns:generator for all outgoing edges
Return type:generator for Edge
parents()

Yields a list of all parent nodes

Returns:generator for all parent nodes
Return type:generator for Node
pipeline_name(name)

Set the user defined name for the pipeline this node belongs to

set(name, value, set_dep=False, allow_stream=True, append=False)

Set an option

class jip.pipelines.Pipeline(cwd=None)

A pipeline is a directed acyclic graph of Nodes and edges

add(tool, _job=None)

Add a tool or a node to the pipeline. If the given value is not a node, it is wrapped in a new node instance and then added to the pipeline. The newly created node is returned.

Note that the nodes uniquely map to tool instances. You can not add the same instance twice to the pipeline. Instead, no new node will be added and the already existing node will be returned.

Parameters:tool (jip.tools.Tool or Node) – the tool or node
Returns:the new node
Return type:Node
add_edge(source, target)

Adds an edge between the source and the target if no such edge exists. Otherwise the existing edge will be returned.

Parameters:
  • source (Node or Tool) – the source node or tool instance
  • target (Node or Tool) – the target node or tool instance
Returns:

the edge between source and target

Raises LookupError:
 

if the source or target node could not be found

bash(command, **kwargs)

Create a bash job that executes a bash command.

This us a fast way to build pipelines that execute shell commands. The functions wraps the given command string in the bash tool that is defined with input, output, and outfile. Input and output default to stdin and stdout.

Parameters:
  • command (string) – the bash command to execute
  • kwargs – arguments passed into the context used to render the bash command. input, output, and outfile are passed as options to the bash tool that is used to run the command
Returns:

a new pipeline node that represents the bash job

Return type:

jip.pipelines.Node

context(context)

Update the global context of the pipeline and add the values from the given context

Parameters:context – the context
edges

Access all edges in the current pipeline graph as a list of Edge

Getter:get a list of all edges
Type:list of Edge
exclude(excludes)

Takes a list of node names and removes all nodes and their successors from the graph.

Parameters:excludes (list of string) – list of node names
expand(context=None, validate=True, _find_dup=True, _check_fanout=True)

This modifies the current graph state and applies fan_out operations on nodes with singleton options that are populated with list. An exception is raised in case a node has more than one option that should be expanded and the number of configured elements is not the same.

You can specify a context that will be used additionally to resolve template variables and references in node options. This allows you to give the template system access to your local environment. For example:

>>> p = Pipeline()
>>> a = "myinput.txt"
>>> p.bash('wc -l ${a}')
bash
>>> p.expand(locals())
False
>>> assert p.get("bash").cmd.get() == 'wc -l myinput.txt'
Parameters:
  • validate – disable validation by setting this to false
  • context – specify a local context that is taken into account in template and option rendering
get(name)

Find a node by tool or node name including its node index.

We search here through the node, searching for a node whose name equals the given name. The full name consists if of the tool name and the node index if there is are more nodes with the same name. A node index is typically assigned and used after pipeline expansion, which means you might have to append the correct index to the node you are looking for.

This is necessary because multi-plexing of the pipeline can not always guarantee unique nodes names. The nodes might get duplicated based on the input of the pipeline. Therefor a unique node index is appended to the node name. You can expect the pipeline nodes and their names using the nodes() method and iterate it. Printing, or calling str will resolve the current node name.

If you assign a job name to the node, this will overwrite the node name and will be used instead, but note that the same indexing rules apply and if graph contains more than one node with the same name, the node index will be appended to the node/job name.

If the index is appended, the node name always has the form “<name>.<index>”.

For example, without any special assignment, the node name defaults to the name of the tool. If there is only one node with that name, no modifications are applied and the node index is ignored:

>>> p = Pipeline()
>>> p.run('bash', cmd='ls')
bash
>>> p.expand()
False
>>> assert p.get("bash") is not None
Parameters:name – node name
Returns:node name
Raises LookupError:
 if no such node exists
get_edge(source, target)

Returns the edge between source and target or raises a KeyError if no such edge exists.

Parameters:
  • source (Node or Tool) – the source node or tool instance
  • target (Node or Tool) – the target node or tool instance
Returns:

the edge between source and target

Raises:
  • LookupError – if the source or target node could not be found
  • KeyError – if no edge between source and target exists
groups()

Sorts the nodes in topological order and than groups nodes together if they have a dependency and at least one of the dependency options is set for streaming.

Yields lists of nodes. Each list represents a group of tools that need to be executed in parallel to be able to pipe all streams.

job(*args, **kwargs)

Create a new job profile.

The job profile can be used to customize the execution behaviour of a job. Calling this method will only create a new job profile, but it will not be applied to any node in the graph. You can however create nodes from the job profile, using Job.run() or Job.bash(). These nodes will then get a copy of the job profile and the profiles properties will be applied before job execution.

Parameters:
  • args – args passed to Job
  • kwargs – kwargs passed to Job
Returns:

new job profile

Return type:

Job

name(name)

Set the name of the pipeline and ensures that all nodes in the pipeline reference the pipeline name.

Parameters:name (string) – the name of the pipeline
nodes()

Generator that yields the nodes of this pipeline

Returns nodes:the nodes of this pipeline
Return type:list of Node
pipeline_name(name)

Set the user defined name of the pipeline

Parameters:name (string) – the user defined name of the pipeline
remove(tool, remove_links=True)

Remove the given tool or node from the pipeline graph.

Parameters:tool – tool or node
run(_tool_name, _job=None, **kwargs)

Find the tool specified by name and add it as a node to the pipeline graph.

All additional keyword arguments are passed as option configuration to the tool instance, allowing you to configure your tool when you create it.

Note that the tools validate() method is called here silently. Exceptions are caught and logged. This is necessary to allow tools to initialize themselves when they are added to a pipeline.

Parameters:
  • _tool_name – a Tool instance or a tool name
  • kwargs – all keyword arguments are passed to the tool as option configurations
Returns:

the newly added node

Return type:

Node

Raises jip.tool.ToolNotFoundException:
 

if the specified tool could not be found

skip(excludes)

Takes a list of node names or node instances and removes the node and tries to connect parent and children of the node

Parameters:excludes (list of string) – list of node names
topological_order()

Generator function that yields the nodes in the graph in topological order.

Please note that this function does not cache the order and recalculates it on each call. If you know the pipeline graph will not change any more and you have to iterate the nodes in order more than once, you might want to cache the results:

>>> pipeline = Pipeline()
>>> ordered = list(pipeline.topological_order())
Returns:yields nodes in topological order
validate()

Validate all nodes in the graph

Fork me on GitHub