Problem
Part of my workflow involves downloading hundreds of thousands of files, parse the data, and then save to csv locally. I'm trying to set this workflow up with Dask but it does not appear to be processing in parallel. The Dask dashboard shows low cpu % for each worker and the task tab is empty. Status doesn't show anything either. htop
doesn't appear to processing more than 1 or 2 "running" at a time. I'm not sure how to proceed from here.
Related: How should I write multiple CSV files efficiently using dask.dataframe? (Older question that this question is based on)
Example
from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import wget
import zipfile
import multiprocessing
def get_fn(dat):
### Download file and unzip based on input dat
url = f"http://www.urltodownloadfrom.com/{dat['var1']}/{dat['var2']}.csv"
wget.download(url)
indat = unzip()
### Process file
outdat = proc_dat(indat)
### Save file
outdat.to_csv('file_path')
### Trash collection with custom download fn
delete_downloads()
if __name__ == '__main__':
### Dask setup
NCORES = multiprocessing.cpu_count() - 1
client = Client(n_workers=NCORES, threads_per_worker=1)
### Build df of needed dates and variables
beg_dat = "2020-01-01"
end_dat = "2020-01-31"
date_range = pd.date_range(beg_dat, end_dat)
var = ["var1", "var2"]
lst_ = [(x, y) for x in date_range for y in var]
date = [x[0] for x in lst_]
var = [x[1] for x in lst_]
indf = pd.DataFrame({'date': date, 'var': var}).reset_index()
### Group by each row to process
gb = indf.groupby('index')
gb_i = [gb.get_group(x) for x in gb.groups]
### Start dask using delayed
compute([delayed(get_fn)(thisRow) for thisRow in gb_i], scheduler='processes')
Dashboard
In this line:
compute([...], scheduler='processes')
you explicitly use a scheduler other than the distributed one you set up earlier in the script. If you do not specify scheduler=
here, you will use the correct client, as it has been set as the default. You will see things appear in the dashboard.
Note that you might still not see high CPU usage, since it seems likely that most of the time is waiting for downloads.