I have a processing that is done in Luigi, in one of the phases I perform a series of calculations in the DataFrame. To speed up I decided to use a local Dask cluster. When I run through Python or Jupyter, the cluster goes up and I run everything right, but when it runs inside Luigi it gives the following error:
UserWarning: Failed to start diagnostic server on port 8787.
df = func(params)
df.to_csv('...')
def func(params):
df = params.get('df')
client = Client()
result = [client.submit(sample, row) for index, row in df.iterrows()]
result = client.gather(result)
new_df = pd.DataFrame(result)
return df
How to solve this?
This is untested code (no experience with luigi
)
How about the following code (as separate module) -
from dask.distributed import Client
df = func(params)
df.to_csv('...')
def func(params):
df = params.get('df')
result = [client.submit(sample, row) for index, row in df.iterrows()]
result = client.gather(result)
new_df = pd.DataFrame(result)
return df
if __name__ == "__main__":
with Client() as client:
df_result = func(params)