Search code examples
pythondaskpython-xarraydask-delayed

Running Embarrasingly Parallel operations on a single piece of data using Dask


I was following this tutorial and I was able to parallelize a for loop where operations were done independently on multiple files. However, now I need to perform an iterative function to extract variables into 300 files from a single xarray dataset (source). My functions looks like this :

def build_data(parameters)
    extract var1 from source
    extract var2 from source
    extract var3 from source
    ..........
    return('finished')

I have set 6 workers and to run this function in a loop 300 times, I implemented :

source = xr.opendataset()
build_delayed = []
for i in range (0,300):
   task = dask.delayed (build_data) (params)
   build_delayed.append(task)
dask.compute(*build_delayed)

But this is not working and seems like latency is building up as all workers are trying access same piece of data. Also, when the loop ends, None is returned in build_delayed[ ].

What am I missing here? How do I make it work in parallel in this case?

(My detailed question.)

EDIT : MVCE :

stations = []
for key in valid_stations:
    station = pd.read_csv('~/projects/LIS_verification/pyLISF/data/IFI-Observations/flow_2/'+key+'.csv')
    station.reset_index(drop = True,inplace=True)
    station.set_index('Date',inplace=True)
    station.index = pd.to_datetime(station.index)
    stations.append(station)

routedat = xr.opendataset('/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/file1.nc')
    
def build_data(stations,routedat,i) :
    try :
        start_date = stations[i]['Streamflow (cumecs)'].first_valid_index()
        lis_date = routedat['time'].values
        return (start_date,lis_date)
    except Exception as e :
        return( start_date,str(e))
    
for i in range (0,10):
    task = dask.delayed (build_data) (stations,routedat,i)
    build_delayed.append(task)

dask.compute(*build_delayed)

I am getting the required output but the time taken is too large as compared to sequential loop (508 ms vs 31 seconds).

UPDATE : I was able to run it successfully in < 300 ms in parallel by using command .compute(scheduler='synchronous')


Solution

  • This is just a guess, yet did you try to create a reduce function and compute it?

    Something like:

    @dask.delayed
    def delayed_reduce(array):
        out = ''
        for entry in array:
            print (entry)
            out = out + str(entry) + ' '
        return out
    
    build_delayed = []
    for i in range (0,300):
       task = dask.delayed (build_data) (params)
       build_delayed.append(task)
    out = delayed_reduce(build_delayed) 
    value = out.compute()
    

    This is just skeleton of the code, yet you get the idea. If you want to debug what is going on, add a different return value and also print it to some file.

    Also, you did not mention how you run dask. Are you using distributed? If you do, make sure you have initialized the workers and scheduler.

    Hopefully this skeleton code will help you locate the issue.