The JIP Pipeline module contains the classs and functions used to create pipeline graphs
An edge in the pipeline graph connecting source and target nodes. The edge has optional information about the jip.options.Options that are connected through this edge.
The edge carries a set on links. Links are tuples of the form (source_option, target_option, streamable).
In addition, the edges _group flag indicates that the two nodes linked by the edge should form a job group.
Create an option link between the source option and the target options. This also checks that the source_option source is the same as the edges source._tool and the target_option source is the same as the edges target._tool
Parameters: |
|
---|
Returns the first link that is set to streaming
Returns true if a least one link is set to streaming
Iterate the links associated with this edge and make sure that their values are unset in the target options.
Container class that wraps job meta-data.
The pipeline job extends the general jip.profiles.Profile, and extends it in a way that you can create new pipeline nodes from the job. Those nodes will then hold a reference to the profile and all customization on the profile will be applied to the node.
Create a new bash job.
Parameters: |
|
---|---|
Returns: | the newly created node |
Return type: |
Set the jobs name
Getter: | access the jobs name |
---|---|
Setter: | set the jobs name |
Type: | string |
Delegates to Pipeline.run() and runs the specified tool using this job environment configuration
Parameters: |
|
---|---|
Returns: | the newly created node |
Return type: |
A single node in the pipeline graph.
If the node is linked to a jip.tools.Tool instance, attributes are resolved using teh tools options and the jip.options.Option instances are returned. This mechanism is used to automatically create edges between tools when their options are referenced. These links are stored on the Edge. If no edge exists, one will be created.
Yields a list of all children of this node
Returns: | generator for all child nodes |
---|---|
Return type: | generator for Node |
Add an explicit dependency between this node and the other node. The function accepts multiple values so you can specify multiple parents at once.
Parameters: | args* – all parent nodes. |
---|
Find a link in the incoming edges where the target option is the given option
Parameters: | option (jip.options.Option) – the option to search for |
---|---|
Returns: | link instance for the given option or None |
Find a link in the outgoing edges where the source option is the given option
Parameters: | option (jip.options.Option) – the option to search for |
---|---|
Returns: | link instance for the given option or None |
Returns a tuple of an options and a node, where the options supports streams and the node is a parent node of this node. If no such combination exists, a tuple of (None, None) will be returned.
Returns: | tuple of (Option, Node) where the option supports streaming and the Node is a parent node. |
---|
Groups this not and the other node. This creates a dependency between this node and the other nodes and enables grouping so the two nodes will be executed in the same job. The other node is returned so group chains can be created easily.
Parameters: | other (Node) – the child node |
---|---|
Returns other: | the other node |
Returns true if this node has an incoming edge where the parent node is the given other node. If link is specified, it has to but a tuple with the source and the target option names. If specified the detected edge has to carry the specified link. If stream is not None the link is checked if its a streaming link or not.
If not other node is specified this returns True if this node has any incoming edges.
If value is specified, the delegate value has to be equal to the specified value.
You can use the incoming edge check like this:
node.has_incoming(other, ('output', 'input'), False, "data.txt")
This return True if the node node has an incoming edge from the other node, the edge linkes other.output to node.input, no stream is passed and the actual value is “data.txt”.
Parameters: |
|
---|---|
Returns: | True if the edge exists |
Returns true if this node has an outgoing edge where the child node is the given other node. If link is specified, it has to but a tuple with the source and the target option names. If specified the detected edge has to carry the specified link. If stream is not None the link is checked if its a streaming link or not.
If not other node is specified this returns True if this node has any outgoing edges.
If value is specified, the delegate value has to be equal to the specified value
You can use the outgoing edge check like this:
node.has_outgoing(other, ('output', 'input'), False, "data.txt")
This return True if the node node has an outgoing edge to the other node, the edge links node.output to other.input, no stream is passed and the actual value is “data.txt”.
Parameters: |
|
---|---|
Returns: | True if the edge exists |
Yields all incoming edges of this node
Returns: | generator for all incoming edges |
---|---|
Return type: | generator for Edge |
The nodes job profile
Getter: | Returns the nodes job profile |
---|---|
Type: | jip.pipelines.Job |
Get a unique name for this node.
The unique name is created based on the job name. If no job name is assigned, the tool name is used. If the new node name is not unique within the pipeline context, the nodes index is appended to the node.
Getter: | returns a unique name for this node |
---|---|
Type: | string |
Create an embedded pipeline that will be submitted or executed after this node was successfully executed. The function returns a tuple: (pipeline, node)
Parameters: |
|
---|---|
Returns: | tuple of (pipeline, node) |
Yields all outgoing edges of this node
Returns: | generator for all outgoing edges |
---|---|
Return type: | generator for Edge |
Yields a list of all parent nodes
Returns: | generator for all parent nodes |
---|---|
Return type: | generator for Node |
Set the user defined name for the pipeline this node belongs to
Set an option
A pipeline is a directed acyclic graph of Nodes and edges
Add a tool or a node to the pipeline. If the given value is not a node, it is wrapped in a new node instance and then added to the pipeline. The newly created node is returned.
Note that the nodes uniquely map to tool instances. You can not add the same instance twice to the pipeline. Instead, no new node will be added and the already existing node will be returned.
Parameters: | tool (jip.tools.Tool or Node) – the tool or node |
---|---|
Returns: | the new node |
Return type: | Node |
Adds an edge between the source and the target if no such edge exists. Otherwise the existing edge will be returned.
Parameters: | |
---|---|
Returns: | the edge between source and target |
Raises LookupError: | |
if the source or target node could not be found |
Create a bash job that executes a bash command.
This us a fast way to build pipelines that execute shell commands. The functions wraps the given command string in the bash tool that is defined with input, output, and outfile. Input and output default to stdin and stdout.
Parameters: |
|
---|---|
Returns: | a new pipeline node that represents the bash job |
Return type: |
Update the global context of the pipeline and add the values from the given context
Parameters: | context – the context |
---|
Access all edges in the current pipeline graph as a list of Edge
Getter: | get a list of all edges |
---|---|
Type: | list of Edge |
Takes a list of node names and removes all nodes and their successors from the graph.
Parameters: | excludes (list of string) – list of node names |
---|
This modifies the current graph state and applies fan_out operations on nodes with singleton options that are populated with list. An exception is raised in case a node has more than one option that should be expanded and the number of configured elements is not the same.
You can specify a context that will be used additionally to resolve template variables and references in node options. This allows you to give the template system access to your local environment. For example:
>>> p = Pipeline()
>>> a = "myinput.txt"
>>> p.bash('wc -l ${a}')
bash
>>> p.expand(locals())
False
>>> assert p.get("bash").cmd.get() == 'wc -l myinput.txt'
Parameters: |
|
---|
Find a node by tool or node name including its node index.
We search here through the node, searching for a node whose name equals the given name. The full name consists if of the tool name and the node index if there is are more nodes with the same name. A node index is typically assigned and used after pipeline expansion, which means you might have to append the correct index to the node you are looking for.
This is necessary because multi-plexing of the pipeline can not always guarantee unique nodes names. The nodes might get duplicated based on the input of the pipeline. Therefor a unique node index is appended to the node name. You can expect the pipeline nodes and their names using the nodes() method and iterate it. Printing, or calling str will resolve the current node name.
If you assign a job name to the node, this will overwrite the node name and will be used instead, but note that the same indexing rules apply and if graph contains more than one node with the same name, the node index will be appended to the node/job name.
If the index is appended, the node name always has the form “<name>.<index>”.
For example, without any special assignment, the node name defaults to the name of the tool. If there is only one node with that name, no modifications are applied and the node index is ignored:
>>> p = Pipeline()
>>> p.run('bash', cmd='ls')
bash
>>> p.expand()
False
>>> assert p.get("bash") is not None
Parameters: | name – node name |
---|---|
Returns: | node name |
Raises LookupError: | |
if no such node exists |
Returns the edge between source and target or raises a KeyError if no such edge exists.
Parameters: | |
---|---|
Returns: | the edge between source and target |
Raises: |
|
Sorts the nodes in topological order and than groups nodes together if they have a dependency and at least one of the dependency options is set for streaming.
Yields lists of nodes. Each list represents a group of tools that need to be executed in parallel to be able to pipe all streams.
Create a new job profile.
The job profile can be used to customize the execution behaviour of a job. Calling this method will only create a new job profile, but it will not be applied to any node in the graph. You can however create nodes from the job profile, using Job.run() or Job.bash(). These nodes will then get a copy of the job profile and the profiles properties will be applied before job execution.
Parameters: | |
---|---|
Returns: | new job profile |
Return type: |
Set the name of the pipeline and ensures that all nodes in the pipeline reference the pipeline name.
Parameters: | name (string) – the name of the pipeline |
---|
Generator that yields the nodes of this pipeline
Returns nodes: | the nodes of this pipeline |
---|---|
Return type: | list of Node |
Set the user defined name of the pipeline
Parameters: | name (string) – the user defined name of the pipeline |
---|
Remove the given tool or node from the pipeline graph.
Parameters: | tool – tool or node |
---|
Find the tool specified by name and add it as a node to the pipeline graph.
All additional keyword arguments are passed as option configuration to the tool instance, allowing you to configure your tool when you create it.
Note that the tools validate() method is called here silently. Exceptions are caught and logged. This is necessary to allow tools to initialize themselves when they are added to a pipeline.
Parameters: |
|
---|---|
Returns: | the newly added node |
Return type: | |
Raises jip.tool.ToolNotFoundException: | |
if the specified tool could not be found |
Takes a list of node names or node instances and removes the node and tries to connect parent and children of the node
Parameters: | excludes (list of string) – list of node names |
---|
Generator function that yields the nodes in the graph in topological order.
Please note that this function does not cache the order and recalculates it on each call. If you know the pipeline graph will not change any more and you have to iterate the nodes in order more than once, you might want to cache the results:
>>> pipeline = Pipeline()
>>> ordered = list(pipeline.topological_order())
Returns: | yields nodes in topological order |
---|
Validate all nodes in the graph