jip.cluster

The JIP cluster module contains the main class that has to be extended to add cluster support as well as useful helper functions to access the cluster instance.

Cluster implementation provide a set of minimal functionality that covers the following tasks:

  • submit jobs to a compute cluster
  • list currently running or queued jobs
  • cancel a job

In addition, a cluster implementation might provide the ability to:

  • resolve paths to log file
  • update job meta data

The current JIP release bundles implementation for the following grid engines:

If you want to implement your own cluster integration, the class to extend from is Cluster. In order to get a working implementation, implement at least the Cluster.submit() function. This will already allow you to submit jobs. All other functions are optional, but of course necessary if you want to provide the functionality. The main purpose of the submit method is to get your job on a remote cluster. The parameter passed to the submit method is a Job instance. The job contains all available information about the execution and the submit implementation is allowed and encourage to update some of the fields of the jobs. Most importantly, make sure you set the jobs job_id after successful submission. In addition, commonly updated fields are stdout and stderr, setting the correct paths to log files. Please take a look at the Cluster.resolve_log() function on how log file names are handles. Within submission, if you update these fields, you are encouraged to include place-holders in the file names.

Note

You can get the command that should be send to the cluster using jip.db.Job.get_cluster_command()! Please do NOT try to send the Jobs command directly. Job execution has to go through JIP in order to provide all functionality.

If you need to pass specific configuration to your cluster, DO NOT use mandatory initializer parameters. The cluster module has to be able to instantiate your class without any parameter. You can however use keyword argument in order to allow easy manual instantiation. However, defaults should be loaded from the jip configuration. This is the preferred way for a user to configure the cluster instance. You have full access to the JIP configuration using the jip.config global variable. The variable holds an initialized instance of Config. Here is an example of how you can allow the user to add a custom configuration block and then use it to access configured values:

>>> import jip
>>> from jip.cluster import Cluster
>>> class MyCluster(Cluster):
...     def __init__(self):
...         cfg = jip.config.get('myconfig', {})
...         self.myvalue = cfg.get('myvalue', 1)
>>>

If you need to allow for custom configuration, please do not forget to document the blocks and fields that are supported and have to be added to the configuration.

If an error occurs during job submission, please raise an SubmissionError containing a useful error message. Please note also that you should use jip.logging module and expose some useful logging statements. If you submit jobs by calling an external command, for example with python subprocess, please log the full command at debug log level. You can get a logger instance like this:

>>> import jip.logger
>>> log = jip.logger.getLogger('my.module')

Besides the cluster class, this module has a get() function that can be used to get an instance of the currently configured cluster environment. The get() functions always returns a cached version of the cluster instance and all implementation should avoid storing instance variables that are job dependent.

Methods

jip.cluster.get(name=None)

Returns the currently configured cluster instance using the configured class name in the configuration if no explicit name is specified.

Parameters:name – specify explicitly a full class name to the cluster implementation
Returns:the Cluster instance
Return type:Cluster
Raises ClusterImplementationError:
 if the specified cluster implementation could not be loaded

Abstract Cluster class

class jip.cluster.Cluster

Base class for cluster integrations.

In order to add support for a cluster engine or if you want to customize how jobs are submitted to your compute cluster, extend this class.

The most important function is submit(), which takes a Job instance and sends it to the compute cluster. The methods does not return anything but is allowed to modify the submitted job. Usually, you want to update the jobs jip.db.Job.job_id attribute and store the remote job id.

Please not that the list(), submit(), and cancel() methods raise a NotImplementedError by default. update() and resolve_log() are implemented with an empty body and no operation will happen by default.

cancel(job)

Cancel the given job

Parameters:job (jip.db.Job) – the job instance
list()

A list of all active job id’s that are currently queued or running in the cluster.

Returns:list of job ids of active jobs
Return type:list of string
resolve_log(job, path)

Resolve cluster specific file pattern to get the path to a log file.

Log file paths support cluster engine specific place holders and this method takes care of resolving paths containing such patterns. For example, Slurm used %j as a place-holder for the job id. This method resolves those cluster specific place-holders to return the full path to the log file.

Parameters:
  • job (jip.db.Job) – the job instance
  • path (string) – log file name
Returns:

resolved log file replacing any placeholders

submit(job)

Implement this method to submit jobs to the remote cluster.

Implementation are allowed and encouraged to modify the job instance. Usually, you want to update the jobs jip.db.Job.job_id attribute and store the remote job id.

Please note that the Jobs extra field contains an array of additional parameters that are compatible with the cluster. The array of parameters should be passes as is to the command used for job submission.

NOTE that you can get the command that should be send to the cluster using jip.db.Job.get_cluster_command()! Please do NOT try to send the Jobs command directly. Job execution has to go through JIP in order to provide all functionality.

Parameters:job (jip.db.Job) – the job
Raises SubmissionError:
 if the submission failed
update(job)

Called during job execution to update a job and set properties that are cluster specific, i.e. the hosts list.

Parameters:job (jip.db.job) – the job

Exceptions

exception jip.cluster.SubmissionError

This exception is raised if a job submission failed.

exception jip.cluster.ClusterImplementationError

Exception raised in case the cluster class could not be loaded

Implementations

class jip.cluster.Slurm

Slurm extension of the Cluster implementation.

The Slurm implementation sends jobs to the cluster using the sbatch command line tool. The job parameter are passed to sbatch as they are, but please note that:

  • max_mem is passed as –mem-per-cpu
  • queue is used as the Slurm partition parameter
  • priority is used as the Slurm QOS parameter

The implementation supports a slurm configuration block in the JIP configuration, which can be used to customize the paths to the commands used (sbatch, scancel, and squeue. You can enable and configure the Slurm integration with a JIP configuration like this:

{
    "cluster": "jip.cluster.Slurm",
    "slurm": {
        "sbatch": "/path/to/sbatch",
        "squeue": "/path/to/squeue",
        "scancel": "/path/to/scancel"
    }
}

Note

By default the implementation assumed that the commands are available in your PATH and if that is the case, you do not have to explicitly configure the paths to the commands.

class jip.cluster.SGE

SGE extension of the Cluster implementation.

The SGE submission can be configured using the global jip configuration. The implementation looks for a dictionary sge and supports the following settings:

  • threads_pe the name of the parallel environment used to submit

    multi-threaded jobs

  • qsub path to the qsub command

  • qstat path to the qstat command

  • qdel path to the qdel command

  • mem_limit the name of the resource used to specify the memory limit. The default is virtual_free. The parameter construction looks like this: -l <mem_limit>=<value> and the value is the specified memory limit in MB.

  • time_limit the name of the resource used to specify the time limit. The default is s_rt. The parameter construction looks like this: -l <time_limit>=<value> and the value is the maximum time in seconds.

You do not have to specify the command options if the commands are available in your path, but the threads_pe option has to be specified to be able to submit multi-threaded jobs.

Parallel jobs submissions are handles using the jobs threads, tasks, and environment fields. Note that there is currently no support to specify how parallel jobs are distributed through out a set of nodes. This depends on the configuration of the queue and parallel environment. If you specify tasks, this takes precedence over threads and will be used as the parameters for the parallel environment. This is how the -pe parameter will be constructed:

-pe <environment> <tasks|threads>
class jip.cluster.PBS

PBS/Torque extension of the Cluster implementation.

The PBS submission can be configured using the global jip configuration. The implementation looks for a dictionary pbs and supports the following settings:

  • qsub path to the qsub command
  • qstat path to the qstat command
  • qdel path to the qdel command

You do not have to specify the command options if the commands are available in your path.

Parallel jobs are allocated using -l nodes=<N>:ppn=<M> where N is the number of nodes and M is the jobs tasks_per_node, tasks, or threads, checked in this order for a value > 0. N will be set to 1 by default. Submitting multi threaded jobs can be achieved simply by specifying the number of threads. The job will request a single node with the M cpus for the job. In order to submit MPI jobs, you have to specify the number of nodes explicitly. The number of mpinodes is then N*M.

class jip.cluster.LSF

LSF extension of the Cluster implementation.

The LSF submission can be configured using the global jip configuration. The implementation looks for a dictionary lsf and supports the following settings:

  • bsub path to the bsub command
  • bjobs path to the bjobs command
  • bkill path to the bkill command
  • limits specify either KB, MB, GB depending on how your LSF instance is interpreting memory limits (LSF_UNIT_FOR_LIMITS). By default we assume that memory limits are specified in KB.

You do not have to specify the command options if the commands are available in your path.

Parallel jobs are submitted using the -n options to specify the number of threads/cpus requested. First, the jobs tasks are checked and used as N. If no tasks as specified, the jobs threads are used. In case you specified the jobs threads, the job is submitted to a single node using -R span[hosts=1]. If you specify no tasks_per_node exlicitly, but a number of nodes, the number of hosts requested is adjusted accordingly. If tasks_per_node are specified, this takes precedence and the job is submitted using -R span[ptile=M] where M is the number of tasks_per_node.

Fork me on GitHub