Search code examples
python-3.xdask-distributedluigi

How to run Dask Client via call from another script?


I have a processing that is done in Luigi, in one of the phases I perform a series of calculations in the DataFrame. To speed up I decided to use a local Dask cluster. When I run through Python or Jupyter, the cluster goes up and I run everything right, but when it runs inside Luigi it gives the following error:

UserWarning: Failed to start diagnostic server on port 8787.

df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  client = Client()
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df

How to solve this?


Solution

  • This is untested code (no experience with luigi)
    How about the following code (as separate module) -

    from dask.distributed import Client  
    df = func(params)
    df.to_csv('...')
    
    def func(params):
      df = params.get('df')
      result = [client.submit(sample, row) for index, row in df.iterrows()]
      result = client.gather(result)
      new_df = pd.DataFrame(result)
      return df 
    
    if __name__ == "__main__":  
        with Client() as client:  
            df_result = func(params)