Search code examples
pythonpandasdaskdask-distributeddask-delayed

Using Dask to download, process, and save to csv


Problem

Part of my workflow involves downloading hundreds of thousands of files, parse the data, and then save to csv locally. I'm trying to set this workflow up with Dask but it does not appear to be processing in parallel. The Dask dashboard shows low cpu % for each worker and the task tab is empty. Status doesn't show anything either. htop doesn't appear to processing more than 1 or 2 "running" at a time. I'm not sure how to proceed from here.

Related: How should I write multiple CSV files efficiently using dask.dataframe? (Older question that this question is based on)

Example

from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import wget
import zipfile
import multiprocessing


def get_fn(dat):    
    ### Download file and unzip based on input dat
    url = f"http://www.urltodownloadfrom.com/{dat['var1']}/{dat['var2']}.csv"
    wget.download(url)
    indat = unzip()

    ### Process file
    outdat = proc_dat(indat)
    
    ### Save file
    outdat.to_csv('file_path')

    ### Trash collection with custom download fn
    delete_downloads()


if __name__ == '__main__':

    ### Dask setup    
    NCORES = multiprocessing.cpu_count() - 1
    client = Client(n_workers=NCORES, threads_per_worker=1)

    ### Build df of needed dates and variables    
    beg_dat = "2020-01-01"
    end_dat = "2020-01-31"
    date_range = pd.date_range(beg_dat, end_dat)
    var = ["var1", "var2"]

    lst_ = [(x, y) for x in date_range for y in var]
    date = [x[0] for x in lst_]
    var = [x[1] for x in lst_]

    indf = pd.DataFrame({'date': date, 'var': var}).reset_index()

    ### Group by each row to process
    gb = indf.groupby('index')
    gb_i = [gb.get_group(x) for x in gb.groups]

    ### Start dask using delayed
    compute([delayed(get_fn)(thisRow) for thisRow in gb_i], scheduler='processes')

Dashboard

enter image description here

enter image description here

enter image description here


Solution

  • In this line:

    compute([...], scheduler='processes')
    

    you explicitly use a scheduler other than the distributed one you set up earlier in the script. If you do not specify scheduler= here, you will use the correct client, as it has been set as the default. You will see things appear in the dashboard.

    Note that you might still not see high CPU usage, since it seems likely that most of the time is waiting for downloads.