Job utilities that cover basic pipeline graph traversals and wrappers around common actions.
JIP pipelines consist of a set of jip.db.Job instances that might be interconnected through dependencies. The jobs can be executed either locally or on a compute cluster.
The jobs module provides the essential helper functions to create jobs from tools and pipelines. In addition, this module contains the functions to perform the most basic actions on jobs and sort and traverse a set of jobs.
JIP jobs are always created from jip.pipelines.Node instances, but this module contains a helper function to convert tools or pipeline into a set of jobs.
Pipeline nodes contain all the base informations that is needed to create a jip.db.Job. You can use py:func:~jip.jobs.from_node to create a single job instance from a node. This method is exposed so you can change to pipeline are translated into jobs, but the most commonly used function to create a set of jobs is create(). It takes either a tool or pipeline instance and returns a set of jobs.
In addition to the create() method, check_output_files() can and should be used on a set of jobs to ensure that no output file is created by multiple jobs.
Create and return a jip.db.Job instance from a Node.
A dictionary with the job environment can be passed here to avoid creating the environment for each job.
Parameters: |
|
---|---|
Returns: | the created job |
Return type: |
Create a set of jobs from the given tool or pipeline. This expands the pipeline and creates a job per pipeline node.
You can specify a list of excludes. The list must contain job names. All jobs with these names will be excluded. This also covered all child jobs of excluded job, effectively disabling the full subgraph that contains the excluded node.
After all jobs are created, they are validated and a ValidationError is raised if a job is not valid. Please note that the output files of the jobs are not checked automatically. You might want to call check_output_files() after you created all your jobs.
Parameters: |
|
---|---|
Raises: | jip.tools.ValueError if a job is invalid |
Return a list of named tuples that reference jobs that can be executed in the right order. The named tuples yield by this generator have the following properties:
- name
- a joined name created for each job group
- job
- the Job instance that can be submitted or executed.
- completed
- boolean that indicates whether the job (and therefore all jobs in the job group) is in “Done” state and marked as completed.
If you need to execute a pipeline, you can use this in conjunction with create_jobs() to yield a list of jobs that you might want to execute or submit:
>>> p = jip.Pipeline()
>>> files = p.bash("ls")
>>> count = p.bash("wc -l", input=files)
>>> p.context(locals())
>>> jobs = create_jobs(p)
>>> for r in create_executions(jobs):
... assert r.completed == False
... assert r.job is not None
... assert r.name == 'files|count'
>>>
Parameters: |
|
---|---|
Returns: | list of named tuples with name, job, and done properties |
Raises ValidationError: | |
if output file checks are enabled and duplications are detected |
Ensures that there are no output file duplication in the given set of jobs and raises a ValidationError if there are.
Parameters: | jobs – list of jobs |
---|---|
Raises ValidationError: | |
if duplicated output files are found |
Check if, for any of the given job, there are queued or running jobs that create the same output files. If that is the case, a ValidationError is raised.
Parameters: |
|
---|---|
Raises ValidationError: | |
if there is a queued or running job that creates the same output as one of the jobs in the given list of jobs |
The following methods can be used to perform basic actions on a single job. Please note that some of the methods can not be called in an arbitrary order on a set of jobs. For example, submit() must be called with jobs sorted in topological order to ensure that all parent jobs are submitted before any child jobs. You can use the topological_order() generator to ensure job order. For example:
for job in jip.jobs.topological_order(jobs):
jip.jobs.submit(job)
Here, we ensure the topological order when jobs are submitted.
Although all of the action method take a single job instance, note that cancel() effects also dependant jobs. If you cancel a job, the job itself and recursively all jobs that depend on the canceled job are effected.
Please also note that some of the action methods provide a silent parameter. If it is set to False, the methods will print status information to stdout.
Jip jobs that are send to a cluster are stored in a database. The database stores the job runtime information and calling any of the action methods might effect the database state of a job. Please note that, except for delete() and run_job(), none of the action methods interact with the database or a database session directly. It the callers responsibility to commit changed to the database after an action method is called.
As mentioned above, the only exception to this rule are the delete() and run_job() methods, which take a database session.
Submit the given job to the cluster. This only submits jobs that are not DONE. The job has to be in canceled, failed, queued, or hold state to be submitted, unless force is set to True. This will NOT submit the child jobs. You have to submit the children yourself and ensure you do that in proper order.
If job submission is forced and a job is in active state, the job is canceled first to ensure there is only a single instance of the job on the cluster.
You have to set save to True in order to save the jobs after successful submission. This will use jip.db.create_session() to get a session instance.
If no cluster is specified, jip.cluster.get() is used to load the default cluster. This will raise a jip.cluster.ClusterImplementationError in case no compute cluster is configured.
Parameters: |
|
---|---|
Returns: | True if the job was submitted |
Raises jip.cluster.ClusterImplementationError: | |
if no cluster could be loaded |
Execute the given job. This method returns immediately in case the job has a pipe source. Otherwise the job and all its dispatch jobs are executed.
NOTE that the run method creates a signal handler that sets the given job state to failed in case the jobs process is terminated by a signal.
Parameters: |
|
---|---|
Returns: | True if the job was executed successfully |
Return type: | boolean |
Hold the given job make sure its no longer on the cluster. The function takes only jobs that are in active state and takes care of the cancellation of any children.
Parameters: |
|
---|
Cancel the given job and make sure its no longer on the cluster.
The function takes only jobs that are in active state and takes care of the cancellation of any children.
Parameters: |
|
---|---|
Returns: | True if job was canceled |
Delete the given job from the database and make sure its no longer on the cluster. If the jobs’ state is an active state, the job is canceled on the cluster. Job cancellation is only performed on jobs that are no pipe_to targets. Please note also that this method does NOT delete any dependencies, it operates ONLY on the given job instance.
You can use jip.jobs.get_subgraph() to get a full subgraph of a job, or jip.jobs.get_group_jobs() to create a list of all jobs that are related due to grouping or piping.
Parameters: |
|
---|
Remove job log files.
Parameters: |
|
---|
Transition a job to a new state.
The new job state is applied to the job and its embedded children. In case the job state became CANCELED, FAILED, or HOLD, and cleanup is not set to False, the jobs tool is loaded and the job cleanup is performed.
The job transition takes also care of the start and finish dates on the job and set them according to the new state.
Parameters: |
|
---|
The job submission process uses a few other functions of this module that are not strictly actions on a single job, but are useful to understand how the system handles jobs specifically with respect to submission:
Create a dictionary that contains the jobs’ environment.
The job environment is loaded at execution time and is available in the process that runs the jobs command. This stores the values from the current environment (usually the machine from which you submit your job) and stores that information in a dictionary. The following environment variables are stored:
- PATH
- The currently configured PATH is stored
- PYTHONPATH
- We store the python path in order to make sure that the JIP command line utilities works as expected and the same JIP version is loaded at job runtime.
- JIP_PATH, JIP_MODULES, JIP_LOGLEVEL, JIP_DB_LOGLEVEL
- Any local modification of the paths to search for tools or the module search paths are stored. In addition, the current log level is passed on to the job, which effectively allows you to debug jip behaviour on the job level
- LD_LIBRARY_PATH
- The library path is also stored in the environment
Parameters: | profiler – if True, JIP_PROFILER is enabled |
---|---|
Returns: | dictionary that contains the job environment |
Return type: | dict |
Takes a list of jobs and returns all jobs of all pipeline graphs involved, sorted in topological order. In contrast to get_subgraph(), this first traverses up in the tree to find all parent nodes involved.
Parameters: | jobs – list of input jobs |
---|---|
Returns: | list of all jobs of all pipeline that are touched by the jobs |
We mentioned earlier that some of the action methods that can be called with a job depend on the order of jobs. This is important in particular for any method that relies on job dependencies. For example, the submit_job() method assumed that any dependencies of a given job are already submitted. The jip.jobs module provides a set of helper functions that allow you to sort a list of jobs or extract certain sub-sets from a graph of jobs.
Generator that yields the elements of the given list of jobs in topological order. NOTE this does NOT resolve any dependencies and only yields the jobs given as parameter
Parameters: | jobs (list of jip.db.Job) – list of jobs |
---|---|
Returns: | yields given jobs in topological order |
Takes a list of jobs and walks up the graph for all job to find all jobs connected to a job in the given job list but without any incoming dependencies.
NOTE that the returned list is not sorted.
Parameters: | jobs – list of jobs |
---|---|
Returns: | list of all parent jobs |
Check if the job has a pipe_from parent and if so return that. If the does does not have any pipe targets, the job itself is returned.
Parameters: | job (jip.db.Job) – the job |
---|---|
Returns: | pipe source job or the job itself if no pipe parent is found |
Returns a list of all jobs that are children of the given job, plus the given job itself. In other words, this resolves the full subgraph of jobs that the given job belongs to. If the given job receives piped input, the pipe parent is used as root for the subgraph.
Parameters: | job (jip.db.Job) – the job |
---|---|
Returns: | all jobs including the given one that form a subgraph in the execution graph where the given job is the root |
Group jobs that will be executed in one step. This returns a list of lists. Each list starts with the ‘primary’ job. This job is the ONLY job that has to be executed. But note that when you submit jobs to a cluster, all jobs of a group have to be submitted. Note that the list of jobs will not be reordered. The list of groups will reflect the ordering of the input jobs.
Parameters: | jobs (list of jobs) – list of jobs |
---|---|
Returns: | the list of groups as a list of lists of jobs |