jip.db

JIP jobs that are submitted to a compute cluster are stored in a Database that is accessible for all running jobs. This is the current way how jobs can populate their state.

JIP uses SQLAlchemy as an abstraction layer to the database. By default, a user specific sqlite database is used to store the data, but you can use any valid database URL in your configuration.

This module contains a few helper functions that to be able to create a database session, and the main :class:Job class that is used as a container to store jobs in the database.

Database access

jip.db.init(path=None, in_memory=False, pool=None)

Initialize the database.

This takes a valid SQLAlchemy database URL or a path to a file and creates the database. If a file path is given, a sqlite database is created.

Parameters:
  • path – database url or path to a file
  • in_memory – if set to True, an in-memory database is created
jip.db.get(job_id)

Get a fresh copy of the given job by id and return None if the job could not be found.

Parameters:job_id – the job id
Returns:the job instance or None
jip.db.query(job_ids=None, cluster_ids=None, archived=False, fields=None, pipeline_name=None)

Query the the database for jobs.

You can limit the search to a specific set of job ids using either the job ids or the remote cluster ids. If none are specified, all jobs are queried.

By default the search is limited to non-archived jobs. You can set the archived parameter to True to query only archived jobs or to None to query both.

In addition, you can use the fields paramter to limit the fields that are retrieved by the query for each job. By default, all fields are retrieved.

Parameters:
  • job_ids – iterable of job ids
  • cluster_ids – iterable of cluster ids
  • archived – set to True to query archived jobs and to None to query all jobs
  • fields – list of field names that should be retirieved
  • pipeline_name – name of the pipeline that should be retirieved
Returns:

iterator over the query results

jip.db.query_by_files(inputs=None, outputs=None, and_query=False)

Query the database for jobs that reference the given input or output file. NOTE that the queries are performed ONLY against absolute paths!

By default, of both inputs and outputs are specified, an OR queries is triggered. You can set the and_query parameter to True to switch to AND.

Parameters:
  • inputs – list of absolute path file names or s single file name
  • outputs – list of absolute path file names or s single file name
  • and_query – queries for for jobs with inputs AND outputs instead of OR
Returns:

iterator over all jobs that reference one of the given files

jip.db.get_all()

Returns an iterator over all jobs in the database

jip.db.save(jobs)

Save a list of jobs. This cascades also over all dependencies!

Parameters:jobs – single job or list of jobs
jip.db.delete(jobs)

Delete a job or a list of jobs. This does NOT resolve any dependencies but removes the relationships.

Note that no searched on the jobs dependencies are performed. You have to create the job list with all the jobs you want updated manually. You can use jip.jobs.get_subgraph() to get a full subgraph of a job, or jip.jobs.get_group_jobs() to create a list of all jobs that are related due to grouping or piping.

Parameters:jobs – single job or list of jobs

Module Methods

jip.db.create_session(embedded=False)

Creates and return a new SQAlchemy session instance and initializes the database if the DB was not initialized.

Parameters:embedded – start the database in embedded mode :returns: a new SQLAlchemy session
jip.db.commit_session(session)

Helper to work around the locking issues the can happen with sqlite and session commits.

This is a very naive approach and we simply try a couple of times to commit the session. If the commit failes, we recreate the session and merge dirty object, add new, and delte deleted object. The new session is then returned.

Returns:the old session in case all went fine, other wise the new sess is returned.
Raises Exception:
 if retrying could not resolve the problem
jip.db.update_job_states(jobs)

Takes a list of jobs and updates the job state, remote id and the jobs start/finish dates as well as the stdout and stderr paths.

Note that no search on the jobs dependencies are performed. You have to create the job list with all the jobs you want updated manually. You can use jip.jobs.get_subgraph() to get a full subgraph of a job, or jip.jobs.get_group_jobs() to create a list of all jobs that are related due to grouping or piping.

Parameters:jobs – list of jobs or single job
jip.db.get_current_state(job)

Returns the current state of the job, fetched from the database. Note that you usually don’t need to use this method. The jobs object state, especially when just fetched from the database, is most probably accurate. This method exists to check the job states after job execution of long running jobs.

Parameters:job – the job
Returns:the jobs state as stored in the database
jip.db.get_active_jobs()

Returns all jobs that are not DONE, FAILED, or CANCELED

jip.db.update_archived(jobs, state)

Takes a list of jobs and updates the job archived flag.

Note that no search on the jobs dependencies are performed. You have to create the job list with all the jobs you want updated manually. You can use jip.jobs.get_subgraph() to get a full subgraph of a job, or jip.jobs.get_group_jobs() to create a list of all jobs that are related due to grouping or piping.

Parameters:jobs – list of jobs or single job

Persisted properties

JIP jobs that are submitted to the cluster are stored in a sqlite database. The jobs are wrapped in the jip.db.Job class and the following properties are stored in the database and are available as instance attributes if a job is fetched from the database.

Job.id

The primary job id

Job.job_id

The remote job id set after submission to a remote cluster

Job.name

User specified name for the job

Job.user

stores the user name of the user that submitted the job

Job.project

Optional user specified project name

Job.pipeline

Optional pipeline name to group jobs

Job.path

Absolute path to the JIP script that created this job this is currently only set for JIP script, not for tools that are loaded from a python module

Job.tool_name

Name of the tool

Job.archived

A job can be archived to be able to hide finished jobs but keep their information. This is indicated by this field

Job.temp

This is used to mark jobs as temporary. Temporary jobs are can be handled differently when jobs or pipeline are restarted or a global cleanup function is called

Job.create_date

Create data of the job

Job.start_date

Start date of the job

Job.finish_date

Finished data of the jobs

Job.state

Current job state. See job states <job_states> for more information

Job.hosts

optional name of the host that executes this job. This has to be set by the cluster implementation at runtime. If the cluster implementation does not support this, the field might not be set.

Job.queue

Stores the name of the queue this job will be submitted to. Interpretation of this field depends on the cluster implementation

Job.priority

Stores the priority assigned to the job. Interpretation of this field depends on the cluster implementation

Job.account

Account information assigned to the job

Job.threads

Number of threads assigned to a job. Defaults to 1

Job.nodes

Number of nodes assigned to a job. This is stored as a string in order to support node ranges. Defaults to None

Job.tasks

Number of tasks assigned to a job. Defaults to 0

Job.tasks_per_node

Number of tasks per node. Defaults to 0

Job.environment

Environment name (used for example as SGE parallel environment)

Job.max_memory

Maximum memory assigned to a job in MB

Job.max_time

Maximum wall clock time assigned to a job in Minutes

Job.working_directory

The jobs working directory. This defaults to the current working directory

Job.stdout

The jobs stdout log file. This can contain place holders like %J that are filled, for example, with the job id to create the final path. The cluster implementation provides a way to resolve a path.

Job.stderr

The jobs stderr log file. This can contain place holders like %J that are filled, for example, with the job id to create the final path. The cluster implementation provides a way to resolve a path.

Job.env

Stores parts of the job environment to allow clean restarts and moves of a Job even though the users current environment setting has changed. See create_job_env() for more information about the environment stored by default.

Job.keep_on_fail

If explicitly set to True, Job output will not be removed in a cleanup step after a job failed or was canceled.

Job.command

The fully rendered job command that will be executed by this job NOTE that is is the final command executed buy the jobs, NOT the command that is send to the cluster. You can get the command send to the cluster using the py:meth:jip.db.Job.get_cluster_command method of the job.

Job.interpreter

The interpreter that will be used to run the command

Job.configuration

The configuration that is used to populate the command template. This stores a version of the tools Options instance

Job.pipe_targets

Stores output files that were moved out of the configuration in order to support a dispatcher pipe that writes to the files in this list as well as to the stdin of other jobs

Job.extra

Extra configuration stored as an array of additional parameters passed during job submission to the cluster implementation

Job.dependencies

List of “parent” jobs this job depends on

Job.children

List of “child” jobs that depend on this job

Job.pipe_to

List of jobs that will run in parallel with this job and this jobs stdout stream is piped (dispatched) to the other jobs.

Job.pipe_form

List of jobs that will run in parallel with this job and whose stdout stream is piped (dispatched) into this jobs stdin

Job.group_to

List of jobs that will be executed sequentially but in a single job on the remote cluster

Job.group_from

List of jobs that will be executed sequentially but in a single job on the remote cluster

Job.tool

Access the tool instance (see jip.tools.Tool) that is executed buy this job. The tool instance will be fully populated with the configuration stored in this job

Job Utility functions

The Job class exposes the following utility functions:

Job.get_pipe_targets()

Returns a list of output files where the stdout content of this job will be written to if the jobs output stream is also piped to some other process.

Returns:list of output file or empty list
Job.is_stream_source()

Returns True if this job has child jobs that receive the output stream of this job

Returns:True if the job pipes its data to another job
Job.is_stream_target()

Returns True if this job takes the output stream of at least one parent as input.

Returns:True if this Job receives its data as a stream from another job
Job.terminate()

Terminate a currently running process that executes this job. NOTE that this method does NOT perform any cleanup operations or state updates, it simply terminates the underlying process.

Job.run()

Execute a single job. Note that no further checks on the job are performed and this method assumes that the jobs stream_in and stream_out are properly connected.

Returns:the process
Raises Exception:
 if the interpreter was not found
Job.get_cluster_command()

Returns the command that should send to the cluster to run this job.

Returns:the command send to the cluster
Job.validate()

Delegates to the tools validate method and ensures absolute paths before validation. The rule for absolute paths is that all output options are made absolute relative to the jobs working directory. All input options are made absolute relative to the current working directory.

Job.is_done(force=False)

Delegates to the tools validate method but also add an additional check streamed jobs. If there are not direct output files, this delegates to the follow up jobs.

Parameters:force – if True, current state is ignored and a file check is forced
Job.get_output_files()

Yields a list of all output files for the configuration of this job. Only TYPE_OUTPUT options are considered whose values are strings. If a source for the option is not None, it has to be equal to this tool.

In addition, any pipe_targets are yield as well as the configuraiton might already been changed to stream.

Returns:list of output files

Job states

JIP jobs can take one of the following states.

jip.db.STATE_HOLD = 'Hold'

Job is submitted but on hold

jip.db.STATE_QUEUED = 'Queued'

Job is submitted to the compute cluster and is queued for execution

jip.db.STATE_RUNNING = 'Running'

Job is currently running

jip.db.STATE_DONE = 'Done'

Job execution successfully completed

jip.db.STATE_FAILED = 'Failed'

Job execution failed

jip.db.STATE_CANCELED = 'Canceled'

Job was canceled by the user

The Job class

class jip.db.Job(tool=None)

The JIP Job class that represents a jobs that is stored in the database.

A job can be referenced by its intername primary id, which is database specific, and its external job_id, which is set in case the job is submitted to a compute cluster.

In addition to the id and the optional job_id, the cob consists of a set of properties that wrap around general features of the job, like number of threads or max_memory or a limiting wall clock time, the job instance hold the current job state, messages and refernces to upstream dependencies.

get_cluster_command()

Returns the command that should send to the cluster to run this job.

Returns:the command send to the cluster
get_input_files()

Yields a list of all input files for the configuration of this job. Only TYPE_INPUT options are considered whose values are strings. If a source for the option is not None, it has to be equal to this tool.

Returns:list of input files
get_output_files()

Yields a list of all output files for the configuration of this job. Only TYPE_OUTPUT options are considered whose values are strings. If a source for the option is not None, it has to be equal to this tool.

In addition, any pipe_targets are yield as well as the configuraiton might already been changed to stream.

Returns:list of output files
get_pipe_targets()

Returns a list of output files where the stdout content of this job will be written to if the jobs output stream is also piped to some other process.

Returns:list of output file or empty list
is_done(force=False)

Delegates to the tools validate method but also add an additional check streamed jobs. If there are not direct output files, this delegates to the follow up jobs.

Parameters:force – if True, current state is ignored and a file check is forced
is_stream_source()

Returns True if this job has child jobs that receive the output stream of this job

Returns:True if the job pipes its data to another job
is_stream_target()

Returns True if this job takes the output stream of at least one parent as input.

Returns:True if this Job receives its data as a stream from another job
restore_configuration()

Modifies the tools configuration to the state before any options were changed to support pipes.

Returns:original configuration
Return type:jip.options.Options
run()

Execute a single job. Note that no further checks on the job are performed and this method assumes that the jobs stream_in and stream_out are properly connected.

Returns:the process
Raises Exception:
 if the interpreter was not found
terminate()

Terminate a currently running process that executes this job. NOTE that this method does NOT perform any cleanup operations or state updates, it simply terminates the underlying process.

validate()

Delegates to the tools validate method and ensures absolute paths before validation. The rule for absolute paths is that all output options are made absolute relative to the jobs working directory. All input options are made absolute relative to the current working directory.

account

Account information assigned to the job

additional_options

Stores a set of additional input options that are used in template rendering but are not liked in the configuration of this job

archived

A job can be archived to be able to hide finished jobs but keep their information. This is indicated by this field

command

The fully rendered job command that will be executed by this job NOTE that is is the final command executed buy the jobs, NOT the command that is send to the cluster. You can get the command send to the cluster using the py:meth:jip.db.Job.get_cluster_command method of the job.

configuration

The configuration that is used to populate the command template. This stores a version of the tools Options instance

create_date

Create data of the job

dependencies

General job dependencies dependencies

env

Stores parts of the job environment to allow clean restarts and moves of a Job even though the users current environment setting has changed. See create_job_env() for more information about the environment stored by default.

environment

Environment name (used for example as SGE parallel environment)

extra

Extra configuration stored as an array of additional parameters passed during job submission to the cluster implementation

finish_date

Finished data of the jobs

hosts

optional name of the host that executes this job. This has to be set by the cluster implementation at runtime. If the cluster implementation does not support this, the field might not be set.

id

The primary job id

in_files

input file references

interpreter

The interpreter that will be used to run the command

job_id

The remote job id set after submission to a remote cluster

keep_on_fail

If explicitly set to True, Job output will not be removed in a cleanup step after a job failed or was canceled.

max_memory

Maximum memory assigned to a job in MB

max_time

Maximum wall clock time assigned to a job in Minutes

name

User specified name for the job

nodes

Number of nodes assigned to a job. This is stored as a string in order to support node ranges. Defaults to None

on_success

embedded pipelines

out_files

output file references

path

Absolute path to the JIP script that created this job this is currently only set for JIP script, not for tools that are loaded from a python module

pipe_targets

Stores output files that were moved out of the configuration in order to support a dispatcher pipe that writes to the files in this list as well as to the stdin of other jobs

pipeline

Optional pipeline name to group jobs

pipeline_name

Optional pipeline user defined name to differentiate pipelines

priority

Stores the priority assigned to the job. Interpretation of this field depends on the cluster implementation

project

Optional user specified project name

queue

Stores the name of the queue this job will be submitted to. Interpretation of this field depends on the cluster implementation

start_date

Start date of the job

state

Current job state. See job states <job_states> for more information

stderr

The jobs stderr log file. This can contain place holders like %J that are filled, for example, with the job id to create the final path. The cluster implementation provides a way to resolve a path.

stdout

The jobs stdout log file. This can contain place holders like %J that are filled, for example, with the job id to create the final path. The cluster implementation provides a way to resolve a path.

tasks

Number of tasks assigned to a job. Defaults to 0

tasks_per_node

Number of tasks per node. Defaults to 0

temp

This is used to mark jobs as temporary. Temporary jobs are can be handled differently when jobs or pipeline are restarted or a global cleanup function is called

threads

Number of threads assigned to a job. Defaults to 1

tool

Get the tool instance that is associated with this job. If the tool is not set, it will be loaded using the jip.find() function

tool_name

Name of the tool

user

stores the user name of the user that submitted the job

working_directory

The jobs working directory. This defaults to the current working directory

Fork me on GitHub