I was following this tutorial and I was able to parallelize a for loop where operations were done independently on multiple files. However, now I need to perform an iterative function to extract variables into 300 files from a single xarray dataset (source). My functions looks like this :
def build_data(parameters)
extract var1 from source
extract var2 from source
extract var3 from source
..........
return('finished')
I have set 6 workers and to run this function in a loop 300 times, I implemented :
source = xr.opendataset()
build_delayed = []
for i in range (0,300):
task = dask.delayed (build_data) (params)
build_delayed.append(task)
dask.compute(*build_delayed)
But this is not working and seems like latency is building up as all workers are trying access same piece of data. Also, when the loop ends, None
is returned in build_delayed[ ].
What am I missing here? How do I make it work in parallel in this case?
(My detailed question.)
EDIT : MVCE :
stations = []
for key in valid_stations:
station = pd.read_csv('~/projects/LIS_verification/pyLISF/data/IFI-Observations/flow_2/'+key+'.csv')
station.reset_index(drop = True,inplace=True)
station.set_index('Date',inplace=True)
station.index = pd.to_datetime(station.index)
stations.append(station)
routedat = xr.opendataset('/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/file1.nc')
def build_data(stations,routedat,i) :
try :
start_date = stations[i]['Streamflow (cumecs)'].first_valid_index()
lis_date = routedat['time'].values
return (start_date,lis_date)
except Exception as e :
return( start_date,str(e))
for i in range (0,10):
task = dask.delayed (build_data) (stations,routedat,i)
build_delayed.append(task)
dask.compute(*build_delayed)
I am getting the required output but the time taken is too large as compared to sequential loop (508 ms vs 31 seconds).
UPDATE : I was able to run it successfully in < 300 ms in parallel by using command .compute(scheduler='synchronous')
This is just a guess, yet did you try to create a reduce function and compute it?
Something like:
@dask.delayed
def delayed_reduce(array):
out = ''
for entry in array:
print (entry)
out = out + str(entry) + ' '
return out
build_delayed = []
for i in range (0,300):
task = dask.delayed (build_data) (params)
build_delayed.append(task)
out = delayed_reduce(build_delayed)
value = out.compute()
This is just skeleton of the code, yet you get the idea. If you want to debug what is going on, add a different return value and also print it to some file.
Also, you did not mention how you run dask. Are you using distributed? If you do, make sure you have initialized the workers and scheduler.
Hopefully this skeleton code will help you locate the issue.