jip.jobs

Job utilities that cover basic pipeline graph traversals and wrappers around common actions.

JIP pipelines consist of a set of jip.db.Job instances that might be interconnected through dependencies. The jobs can be executed either locally or on a compute cluster.

The jobs module provides the essential helper functions to create jobs from tools and pipelines. In addition, this module contains the functions to perform the most basic actions on jobs and sort and traverse a set of jobs.

Job creation

JIP jobs are always created from jip.pipelines.Node instances, but this module contains a helper function to convert tools or pipeline into a set of jobs.

Pipeline nodes contain all the base informations that is needed to create a jip.db.Job. You can use py:func:~jip.jobs.from_node to create a single job instance from a node. This method is exposed so you can change to pipeline are translated into jobs, but the most commonly used function to create a set of jobs is create(). It takes either a tool or pipeline instance and returns a set of jobs.

In addition to the create() method, check_output_files() can and should be used on a set of jobs to ensure that no output file is created by multiple jobs.

jip.jobs.from_node(node, env=None, keep=False)

Create and return a jip.db.Job instance from a Node.

A dictionary with the job environment can be passed here to avoid creating the environment for each job.

Parameters:
  • node (jip.pipelines.Node) – the node
  • env – the environment stored for the job. If None, this will be generated.
  • keep (bool) – keep the job output on failure
Returns:

the created job

Return type:

jip.db.Job

jip.jobs.create_jobs(source, args=None, excludes=None, skip=None, keep=False, profile=None, validate=True, profiler=False)

Create a set of jobs from the given tool or pipeline. This expands the pipeline and creates a job per pipeline node.

You can specify a list of excludes. The list must contain job names. All jobs with these names will be excluded. This also covered all child jobs of excluded job, effectively disabling the full subgraph that contains the excluded node.

After all jobs are created, they are validated and a ValidationError is raised if a job is not valid. Please note that the output files of the jobs are not checked automatically. You might want to call check_output_files() after you created all your jobs.

Parameters:
  • source (jip.pipelines.Pipeline or jip.tools.Tool) – a pipeline or a tool
  • args – options dictionary of arguments that is applied to tool instances
  • excludes – excludes nodes by name. This removed the node and the full subgraph after the node
  • skip – skip the node. This does not touch the subgraph but tries to connect the nodes input with the nodes output before the node is removed
  • keep – keep the jobs output on failure
  • profile – default job profile that will be applied to all jobs
  • validate – set this to False to disable job validation
  • profiler – set to True to enable the job profiler
Raises:

jip.tools.ValueError if a job is invalid

jip.jobs.create_executions(jobs, check_outputs=True, check_queued=True, save=False)

Return a list of named tuples that reference jobs that can be executed in the right order. The named tuples yield by this generator have the following properties:

name
a joined name created for each job group
job
the Job instance that can be submitted or executed.
completed
boolean that indicates whether the job (and therefore all jobs in the job group) is in “Done” state and marked as completed.

If you need to execute a pipeline, you can use this in conjunction with create_jobs() to yield a list of jobs that you might want to execute or submit:

>>> p = jip.Pipeline()
>>> files = p.bash("ls")
>>> count = p.bash("wc -l", input=files)
>>> p.context(locals())
>>> jobs = create_jobs(p)
>>> for r in create_executions(jobs):
...     assert r.completed == False
...     assert r.job is not None
...     assert r.name == 'files|count'
>>>
Parameters:
  • jobs – list of input jobs
  • check_outputs – if True, duplicated output file names are checked and a ValidationError is raised if duplications are detected
  • save – if True, all jobs are added to a new session and the session is committed if no exception occurs
Returns:

list of named tuples with name, job, and done properties

Raises ValidationError:
 

if output file checks are enabled and duplications are detected

jip.jobs.check_output_files(jobs)

Ensures that there are no output file duplication in the given set of jobs and raises a ValidationError if there are.

Parameters:jobs – list of jobs
Raises ValidationError:
 if duplicated output files are found
jip.jobs.check_queued_jobs(jobs, active_jobs=None)

Check if, for any of the given job, there are queued or running jobs that create the same output files. If that is the case, a ValidationError is raised.

Parameters:
  • jobs – the list of jobs to check
  • active_jobs – list of jobs to check against. If not specified, the database is queried for all active jobs
Raises ValidationError:
 

if there is a queued or running job that creates the same output as one of the jobs in the given list of jobs

Job actions

The following methods can be used to perform basic actions on a single job. Please note that some of the methods can not be called in an arbitrary order on a set of jobs. For example, submit() must be called with jobs sorted in topological order to ensure that all parent jobs are submitted before any child jobs. You can use the topological_order() generator to ensure job order. For example:

for job in jip.jobs.topological_order(jobs):
    jip.jobs.submit(job)

Here, we ensure the topological order when jobs are submitted.

Although all of the action method take a single job instance, note that cancel() effects also dependant jobs. If you cancel a job, the job itself and recursively all jobs that depend on the canceled job are effected.

Please also note that some of the action methods provide a silent parameter. If it is set to False, the methods will print status information to stdout.

Jip jobs that are send to a cluster are stored in a database. The database stores the job runtime information and calling any of the action methods might effect the database state of a job. Please note that, except for delete() and run_job(), none of the action methods interact with the database or a database session directly. It the callers responsibility to commit changed to the database after an action method is called.

As mentioned above, the only exception to this rule are the delete() and run_job() methods, which take a database session.

jip.jobs.submit_job(job, clean=False, force=False, save=True, cluster=None)

Submit the given job to the cluster. This only submits jobs that are not DONE. The job has to be in canceled, failed, queued, or hold state to be submitted, unless force is set to True. This will NOT submit the child jobs. You have to submit the children yourself and ensure you do that in proper order.

If job submission is forced and a job is in active state, the job is canceled first to ensure there is only a single instance of the job on the cluster.

You have to set save to True in order to save the jobs after successful submission. This will use jip.db.create_session() to get a session instance.

If no cluster is specified, jip.cluster.get() is used to load the default cluster. This will raise a jip.cluster.ClusterImplementationError in case no compute cluster is configured.

Parameters:
  • job – the job to be submitted
  • clean – if True, the job log files will be submitted
  • force – force job submission
  • save – if True, job will be saved to the database
  • cluster – the compute cluster instance. If None, the default cluster will be loaded from the jip configuration
Returns:

True if the job was submitted

Raises jip.cluster.ClusterImplementationError:
 

if no cluster could be loaded

jip.jobs.run_job(job, save=False, profiler=False, submit_embedded=False, closeDB=False)

Execute the given job. This method returns immediately in case the job has a pipe source. Otherwise the job and all its dispatch jobs are executed.

NOTE that the run method creates a signal handler that sets the given job state to failed in case the jobs process is terminated by a signal.

Parameters:
  • job (jip.db.Job) – the job to run. Note the jobs with pipe sources are ignored
  • save – if True the jobs state changes are persisted in the database
  • profiler – if set to True, job profiling is enabled
  • submit_embedded – if True, embedded pipelines will be submitted and not executed directly
Returns:

True if the job was executed successfully

Return type:

boolean

jip.jobs.hold(job, clean_job=False, clean_logs=False, hold_children=True)

Hold the given job make sure its no longer on the cluster. The function takes only jobs that are in active state and takes care of the cancellation of any children.

Parameters:
  • job – the job
  • clean_logs – if True, the job log files will be deleted
  • clean_job – if True, the job results will be removed
  • silent – if False, the method will print status messages
jip.jobs.cancel(job, clean_job=False, clean_logs=False, cluster=None, save=False, cancel_children=True)

Cancel the given job and make sure its no longer on the cluster.

The function takes only jobs that are in active state and takes care of the cancellation of any children.

Parameters:
  • job (jip.db.Job) – the job
  • clean_logs – if True, the job log files will be deleted
  • clean_job – if True, the job results will be removed
  • cluster – if not Cluster is specified and this is the parent job in a group, the default cluster is loaded
  • save – if True, save job in database after state change
  • cancel_children – set this to False to disable canceling children of a given job
Returns:

True if job was canceled

jip.jobs.delete(job, clean_logs=False, cluster=None)

Delete the given job from the database and make sure its no longer on the cluster. If the jobs’ state is an active state, the job is canceled on the cluster. Job cancellation is only performed on jobs that are no pipe_to targets. Please note also that this method does NOT delete any dependencies, it operates ONLY on the given job instance.

You can use jip.jobs.get_subgraph() to get a full subgraph of a job, or jip.jobs.get_group_jobs() to create a list of all jobs that are related due to grouping or piping.

Parameters:
  • job (jip.db.Job) – the job to be deleted
  • clean (boolean) – if True, the job log files will be deleted
  • cluster – the cluster instance used to cancel jobs. If not specified, the cluster is loaded from the configuration
jip.jobs.clean(job, cluster=None)

Remove job log files.

Parameters:
  • job (jip.db.Job) – the job to be cleaned
  • cluster – the cluster instance used to resolve logs. If not specified, the cluster instance is loaded from the configuration
jip.jobs.set_state(job, new_state, update_children=True, cleanup=True, check_state=False)

Transition a job to a new state.

The new job state is applied to the job and its embedded children. In case the job state became CANCELED, FAILED, or HOLD, and cleanup is not set to False, the jobs tool is loaded and the job cleanup is performed.

The job transition takes also care of the start and finish dates on the job and set them according to the new state.

Parameters:
  • new_state – the new job state
  • id_or_job – the job instance or a job id
  • update_children – if set to False, pipe_to jobs are not updated
  • cleanup – if True the tool cleanup is performed for canceled or failed
  • check_state – if True, the current jobs state is loaded from the database, and if the new state is FAILED and the current db state is CANCELED, the new state becomes CANCELED. This is used to prevent jobs that are CANCELED to get set to FAILED when removed from a compute cluster

The job submission process uses a few other functions of this module that are not strictly actions on a single job, but are useful to understand how the system handles jobs specifically with respect to submission:

jip.jobs.create_job_env(profiler=False)

Create a dictionary that contains the jobs’ environment.

The job environment is loaded at execution time and is available in the process that runs the jobs command. This stores the values from the current environment (usually the machine from which you submit your job) and stores that information in a dictionary. The following environment variables are stored:

PATH
The currently configured PATH is stored
PYTHONPATH
We store the python path in order to make sure that the JIP command line utilities works as expected and the same JIP version is loaded at job runtime.
JIP_PATH, JIP_MODULES, JIP_LOGLEVEL, JIP_DB_LOGLEVEL
Any local modification of the paths to search for tools or the module search paths are stored. In addition, the current log level is passed on to the job, which effectively allows you to debug jip behaviour on the job level
LD_LIBRARY_PATH
The library path is also stored in the environment
Parameters:profiler – if True, JIP_PROFILER is enabled
Returns:dictionary that contains the job environment
Return type:dict
jip.jobs.resolve_jobs(jobs)

Takes a list of jobs and returns all jobs of all pipeline graphs involved, sorted in topological order. In contrast to get_subgraph(), this first traverses up in the tree to find all parent nodes involved.

Parameters:jobs – list of input jobs
Returns:list of all jobs of all pipeline that are touched by the jobs

Job iteration and sorting

We mentioned earlier that some of the action methods that can be called with a job depend on the order of jobs. This is important in particular for any method that relies on job dependencies. For example, the submit_job() method assumed that any dependencies of a given job are already submitted. The jip.jobs module provides a set of helper functions that allow you to sort a list of jobs or extract certain sub-sets from a graph of jobs.

jip.jobs.topological_order(jobs)

Generator that yields the elements of the given list of jobs in topological order. NOTE this does NOT resolve any dependencies and only yields the jobs given as parameter

Parameters:jobs (list of jip.db.Job) – list of jobs
Returns:yields given jobs in topological order
jip.jobs.get_parents(jobs, _parents=None)

Takes a list of jobs and walks up the graph for all job to find all jobs connected to a job in the given job list but without any incoming dependencies.

NOTE that the returned list is not sorted.

Parameters:jobs – list of jobs
Returns:list of all parent jobs
jip.jobs.get_pipe_parent(job)

Check if the job has a pipe_from parent and if so return that. If the does does not have any pipe targets, the job itself is returned.

Parameters:job (jip.db.Job) – the job
Returns:pipe source job or the job itself if no pipe parent is found
jip.jobs.get_subgraph(job, _all_jobs=None)

Returns a list of all jobs that are children of the given job, plus the given job itself. In other words, this resolves the full subgraph of jobs that the given job belongs to. If the given job receives piped input, the pipe parent is used as root for the subgraph.

Parameters:job (jip.db.Job) – the job
Returns:all jobs including the given one that form a subgraph in the execution graph where the given job is the root
jip.jobs.create_groups(jobs)

Group jobs that will be executed in one step. This returns a list of lists. Each list starts with the ‘primary’ job. This job is the ONLY job that has to be executed. But note that when you submit jobs to a cluster, all jobs of a group have to be submitted. Note that the list of jobs will not be reordered. The list of groups will reflect the ordering of the input jobs.

Parameters:jobs (list of jobs) – list of jobs
Returns:the list of groups as a list of lists of jobs
Fork me on GitHub