Search code examples
pythondaskdask-distributeddask-delayed

Why dask doesnt execute in parallel


Could someone point out what I did wrong with following dask implementation, since it doesnt seems to use the multi cores.

[ Updated with reproducible code]

The code that uses dask :

bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats(bookingID):
    curr_book_data = book_data
    row = list()
    row.append(bookingID)
    row.append(curr_book_data.min())
    row.append(curr_book_data.max())
    row.append(curr_book_data.std())
    row.append(curr_book_data.mean())

    return row


calculate_feature_stats = dask.delayed(calculate_feature_stats)


rows = []


for bookid in bookingID.tolist():
    row = calculate_feature_stats(bookid)
    rows.append(row)

  start = time.time()
  rows = dask.persist(*rows)
  end = time.time()
  print(end - start)  # Execution time = 16s in my machine

Code with normal implementation without dask :

bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))

def calculate_feature_stats_normal(bookingID):
    curr_book_data = book_data
    row = list()
    row.append(bookingID)
    row.append(curr_book_data.min())
    row.append(curr_book_data.max())
    row.append(curr_book_data.std())
    row.append(curr_book_data.mean())
   return row


rows = []
start = time.time()
for bookid in bookingID.tolist():
    row = calculate_feature_stats_normal(bookid)
    rows.append(row)
end = time.time()
print(end - start)  # Execution time = 4s in my machine

So, without dask actually faster, how is that possible?


Solution

  • Answer

    Extended comment. You should consider that using dask there is about 1ms overhead (see doc) so if your computation is shorther than that then dask It isn't worth the trouble.

    Going to your specific question I can think of two possible real world scenario: 1. A big dataframe with a column called bookingID and another value 2. A different file for every bookingID

    In the second case you can play from this answer while for the first case you can proceed as following:

    import dask.dataframe as dd
    import numpy as np
    import pandas as pd
    
    
    
    # create dummy df
    df = []
    for i in range(10_000):
        df.append(pd.DataFrame({"id":i,
                                "value":np.random.rand(1000)}))
    df = pd.concat(df, ignore_index=True)
    df = df.sample(frac=1).reset_index(drop=True)
    df.to_parquet("df.parq")
    

    Pandas

    %%time
    df = pd.read_parquet("df.parq")
    out = df.groupby("id").agg({"value":{"min", "max", "std", "mean"}})
    out.columns = [col[1] for col in out.columns]
    out = out.reset_index(drop=True)
    
    CPU times: user 1.65 s, sys: 316 ms, total: 1.96 s
    Wall time: 1.08 s
    

    Dask

    %%time
    df = dd.read_parquet("df.parq")
    out = df.groupby("id").agg({"value":["min", "max", "std", "mean"]}).compute()
    out.columns = [col[1] for col in out.columns]
    out = out.reset_index(drop=True)
    
    CPU times: user 4.94 s, sys: 427 ms, total: 5.36 s
    Wall time: 3.94 s
    

    Final thoughts

    In this situation dask starts to make sense if the df doesn't fit in memory.