Search code examples
daskdask-dataframesnorkel

I'm using Dask to apply LabelingFunction using Snorkel on multiple datasets but it seems to take forever. Is this normal?


My problem is as follow: I have several datasets (900K, 1M7 and 1M7 entries) in csv format which I load into multiple Dask Dataframe. Then I concatenate them all in one Dask Dataframe that I can feed to my Snorkel Applier, which applies a bunch of Labeling Function to each row of my Dataframe and return a numpy array with as many rows as there are in the Dataframe and as many columns as there are Labeling Functions.

The call to Snorkel Applier seems to take forever when I do that with 3 datasets (more than 2 days...). However if I just run the code with only the first dataset, the call takes around 2 hours. Of course I don't do the concatenation step.

So I was wondering how can this be ? Should I change the number of partitions in the concatenated Dataframe ? Or maybe I'm using Dask badly in the first place ?

Here is the code I'm using:

from snorkel.labeling.apply.dask import DaskLFApplier
import dask.dataframe as dd
import numpy as np
import os

start = time.time()

applier = DaskLFApplier(lfs)  # lfs are the function that are going to be applied, one of them featurize one of the column of my Dataframe and apply a sklearn classifier (I put n_jobs to None when loading the model)

# If I have only one CSV to read
if isinstance(PATH_TO_CSV, str):
    training_data = dd.read_csv(PATH_TO_CSV, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'})
    slices = None
 
# If I have several CSV  
elif isinstance(PATH_TO_CSV, list):
    training_data_list = [dd.read_csv(path, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'}) for path in PATH_TO_CSV]
    training_data = dd.concat(training_data_list, axis=0)

    # some useful things I do to know where to slice my final result and be sure I can assign each part to each dataset
    df_sizes = [len(df) for df in training_data_list]
    cut_idx = np.insert(np.cumsum(df_sizes), 0, 0)
    slices = list(zip(cut_idx[:-1], cut_idx[1:]))

# The call that lasts forever: I tested all the code above without that line on my 3 datasets and it runs perfectly fine
L_train = applier.apply(training_data)

end = time.time()
print('Time elapsed: {}'.format(timedelta(seconds=end-start)))

If you need more info I will try to get them to you as much as I can. Thank in you advance for your help :)


Solution

  • It seems that by default applier function is using processes, so does not benefit from additional workers you might have available:

    # add this to the beginning of your code
    from dask.distributed import Client
    client = Client()
    # you can see the address of the client by typing `client` and opening the dashboard
    
    # skipping your other code
    
    # you need to pass the client explicitly to the applier
    # after launching this open the dashboard and watch the workers work :)
    L_train = applier.apply(training_data, scheduler=client)