Search code examples
cluster-computingdaskdask-distributedpbsdask-jobqueue

Custom job script submission to PBS via Dask?


I have a PBS job script with an executable that writes results to out file.

### some lines

PBS_O_EXEDIR="path/to/software"
EXECUTABLE="executablefile"
OUTFILE="out"

### Copy application directory on compute node

[ -d $PBS_O_EXEDIR ] || mkdir -p $PBS_O_EXEDIR
[ -w $PBS_O_EXEDIR ] && \
rsync -Cavz --rsh=$SSH $HOST:$PBS_O_EXEDIR `dirname $PBS_O_EXEDIR`

[ -d $PBS_O_WORKDIR ] || mkdir -p $PBS_O_WORKDIR
rsync -Cavz --rsh=$SSH $HOST:$PBS_O_WORKDIR `dirname $PBS_O_WORKDIR`

# Change into the working directory
cd $PBS_O_WORKDIR

# Save the jobid in the outfile
echo "PBS-JOB-ID was $PBS_JOBID" > $OUTFILE

# Run the executable
$PBS_O_EXEDIR/$EXECUTABLE >> $OUTFILE

In my project, I have to use Dask for this job submission and to monitor them. Therefore, I have configured jobqueue.yaml file like this.

jobqueue:
     pbs:
         name: htc_calc

         # Dask worker options
         cores: 4                 # Total number of cores per job
         memory: 50GB                # Total amount of memory per job

         # PBS resource manager options
         shebang: "#!/usr/bin/env bash"
         walltime: '00:30:00'
         exe_dir: "/home/r/rb11/softwares/FPLO/bin"
         excutable: "fplo18.00-57-x86_64"
         outfile: "out"

         job-extra: "exe_dir/executable >> outfile"

However, I got this error while submitting jobs via Dask.

qsub: directive error: e


tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f3d8c4a56a8>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at /home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/distributed/deploy/spec.py:284> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 1\nCommand:\nqsub /tmp/tmpwyvkfcmi.sh\nstdout:\n\nstderr:\nqsub: directive error: e \n\n',)>)
Traceback (most recent call last):
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/tornado/ioloop.py", line 758, in _run_callback
    ret = callback()
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
    future.result()
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/distributed/deploy/spec.py", line 317, in _correct_state_internal
    await w  # for tornado gen.coroutine support
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/distributed/deploy/spec.py", line 41, in _
    await self.start()
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/dask_jobqueue/core.py", line 285, in start
    out = await self._submit_job(fn)
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/dask_jobqueue/core.py", line 268, in _submit_job
    return self._call(shlex.split(self.submit_command) + [script_filename])
  File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/dask_jobqueue/core.py", line 368, in _call
    "stderr:\n{}\n".format(proc.returncode, cmd_str, out, err)
RuntimeError: Command exited with non-zero exit code.
Exit code: 1
Command:
qsub /tmp/tmpwyvkfcmi.sh
stdout:

stderr:
qsub: directive error: e

How do I specify custom bash script in Dask?


Solution

  • Dask is used for distributing Python applications. In the case of Dask Jobqueue it works by submitting a scheduler and workers to the batch system, which connect together to form their own cluster. You can then submit Python work to the Dask scheduler.

    It looks like from your example you are trying to use the cluster setup configuration to run your own bash application instead of Dask.

    In order to do this with Dask you should return the jobqueue config to the defaults and instead write a Python function which calls your bash script.

    from dask_jobqueue import PBSCluster
    cluster = PBSCluster()
    cluster.scale(jobs=10)    # Deploy ten single-node jobs
    
    from dask.distributed import Client
    client = Client(cluster)  # Connect this local process to remote workers
    
    client.submit(os.system, "/path/to/your/script")  # Run script on all workers
    

    However it just seems like Dask may not be a good fit for what you are trying to do. You would probably be better off just submitting your job to PBS normally.