I am trying to parallize methods from a class using Dask on a PBS cluster.
My greatest challenge is that this method should parallelize some computations, then run further parallel computations on the result. Of course, this should be distributed on the cluster to run similar computations on other data...
The cluster is created :
cluster = PBSCluster(cores=4,
memory=10GB,
interface="ib0",
queue=queue,
processes=1,
nanny=False,
walltime="02:00:00",
shebang="#!/bin/bash",
env_extra=env_extra,
python=python_bin
)
cluster.scale(8)
client = Client(cluster)
The class I need to distribute has 2 separate steps which do have to be run separately since step1 writes a file that is then read at the beginning of the second step.
I have tried the following by putting both steps one after the other in a method :
def computations(params):
my_class(**params).run_step1(run_path)
my_class(**params).run_step2()
chain = []
for p in params_compute:
y = dask.delayed(computations)(p)
chain.append(y)
dask.compute(*chain)
But it does not work because the second step is trying to read the file immediately. So I need to find a way to stop the execution after step1.
I have tried to force the execution of first step by adding a compute() :
def computations(params):
my_class(**params).run_step1(run_path).compute()
my_class(**params).run_step2()
But it may not be a good idea because when running dask.compute(*chain) I'd be ultimately doing compute(compute()) .. which might explain why the second step is not executed ?
What would the best approach be ?
Should I include a persist() somewhere at the end of step1 ?
For info, step1 and step2 below :
def run_step1(self, path_step):
preprocess_result = dask.delayed(self.run_preprocess)(path_step)
gpu_result = dask.delayed(self.run_gpu)(preprocess_result)
post_gpu = dask.delayed(self.run_postgpu)(gpu_result) # Write a result file post_gpu.tif
return post_gpu
def run_step2(self):
data_file = rio.open(self.outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
temp_result1 = self.process(data_file )
final_merge = dask.delayed(self.merging)(temp_result1 )
write =dask.delayed(self.write_final)(final_merge )
return write
This is only a rough suggestion, as I don't have a reproducible example as a starting point, but the key idea is to pass a delayed
object to run_step2
to explicitly link it to run_step1
. Note I'm not sure how essential for you it is to use a class in this case, but for me it's easier to pass the params
as a dict explicitly.
def run_step1(params):
# params is assumed to be a dict
# unpack params here if needed (path_step was not explicitly in the `for p in params_compute:` loop so I assume it can be stored in params)
preprocess_result = run_preprocess(path_step, params)
gpu_result = run_gpu(preprocess_result, params)
post_gpu = run_postgpu(gpu_result, params) # Write a result file post_gpu.tif
return post_gpu
def run_step2(post_gpu, params):
# unpack params here if needed
data_file = rio.open(outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
temp_result1 = process(data_file, params)
final_merge = merging(temp_result1, params)
write = write_final(final_merge, params)
return write
chain = []
for p in params_compute:
y = dask.delayed(run_step1)(p)
z = dask.delayed(run_step2)(y, p)
chain.append(z)
dask.compute(*chain)